[ 
https://issues.apache.org/jira/browse/KAFKA-18723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-18723:
-----------------------------------------------
    Fix Version/s: 4.0.1

> KRaft must handle corrupted records in the fetch response
> ---------------------------------------------------------
>
>                 Key: KAFKA-18723
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18723
>             Project: Kafka
>          Issue Type: Bug
>          Components: kraft
>            Reporter: José Armando García Sancio
>            Assignee: José Armando García Sancio
>            Priority: Major
>             Fix For: 3.9.1, 3.8.2, 3.7.3, 4.0.1
>
>
> It is possible for a KRaft replica to send corrupted records to the fetching 
> replicas in the FETCH response. This is because there is a race between when 
> the FETCH response gets generated by the KRaft IO thread and when the network 
> thread, or linux kernel, reads the byte position in the log segment.
> This race can generated corrupted records if the KRaft replica performed a 
> truncation after the FETCH response was created but before the network thread 
> read the bytes from the log segment.
> I have seen the following errors:
> {code:java}
>  [ERROR] 2025-01-07 15:04:18,273 [kafka-0-raft-io-thread] 
> org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault - 
> Encountered fatal fault: Unexpected error in raft IO thread
> org.apache.kafka.common.KafkaException: Append failed unexpectedly
>       at 
> kafka.raft.KafkaMetadataLog.handleAndConvertLogAppendInfo(KafkaMetadataLog.scala:117)
>       at 
> kafka.raft.KafkaMetadataLog.appendAsFollower(KafkaMetadataLog.scala:110)
>       at 
> org.apache.kafka.raft.KafkaRaftClient.appendAsFollower(KafkaRaftClient.java:1227)
>       at 
> org.apache.kafka.raft.KafkaRaftClient.handleFetchResponse(KafkaRaftClient.java:1209)
>       at 
> org.apache.kafka.raft.KafkaRaftClient.handleResponse(KafkaRaftClient.java:1644)
>       at 
> org.apache.kafka.raft.KafkaRaftClient.handleInboundMessage(KafkaRaftClient.java:1770)
>       at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2355)
>       at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:71)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:138){code}
> and
> {code:java}
>  [ERROR] 2025-01-07 18:06:20,121 [kafka-1-raft-io-thread] 
> org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault - 
> Encountered fatal fault: Unexpected error in raft IO thread"
> org.apache.kafka.common.errors.CorruptRecordException: Record size 0 is less 
> than the minimum record overhead (14)"{code}
> This race also exists with Kafka's ISR based topic partition. In that case 
> the replica fetcher catches all CorruptRecordException and 
> InvalidRecordException.
> {code:java}
>                     } catch {
>                       case ime@(_: CorruptRecordException | _: 
> InvalidRecordException) =>
>                         // we log the error and continue. This ensures two 
> things
>                         // 1. If there is a corrupt message in a topic 
> partition, it does not bring the fetcher thread
>                         //    down and cause other topic partition to also lag
>                         // 2. If the message is corrupt due to a transient 
> state in the log (truncation, partial writes
>                         //    can cause this), we simply continue and should 
> get fixed in the subsequent fetches
>                         error(s"Found invalid messages during fetch for 
> partition $topicPartition " +
>                           s"offset ${currentFetchState.fetchOffset}", ime)
>                         partitionsWithError += topicPartition
>  {code}
> The KRaft implementation doesn't handle this case:
> {code:java}
>               } else {
>                   Records records = 
> FetchResponse.recordsOrFail(partitionResponse);
>                   if (records.sizeInBytes() > 0) {
>                       appendAsFollower(records);
>                   }
>                   OptionalLong highWatermark = 
> partitionResponse.highWatermark() < 0 ?
>                       OptionalLong.empty() :
>                       OptionalLong.of(partitionResponse.highWatermark());
>                   updateFollowerHighWatermark(state, highWatermark);
>               }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to