[ 
https://issues.apache.org/jira/browse/KAFKA-18723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-18723:
-----------------------------------------------
    Description: 
It is possible for a KRaft replica to send corrupted records to the fetching 
replicas in the FETCH response. This is because there is a race between when 
the FETCH response gets generated by the KRaft IO thread and when the network 
thread, or linux kernel, reads the byte position in the log segment.

This race can generated corrupted records if the KRaft replica performed a 
truncation after the FETCH response was created but before the network thread 
read the bytes from the log segment.

I have seen the following errors:
{code:java}
 [ERROR] 2025-01-07 15:04:18,273 [kafka-0-raft-io-thread] 
org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault - 
Encountered fatal fault: Unexpected error in raft IO thread
org.apache.kafka.common.KafkaException: Append failed unexpectedly
        at 
kafka.raft.KafkaMetadataLog.handleAndConvertLogAppendInfo(KafkaMetadataLog.scala:117)
        at 
kafka.raft.KafkaMetadataLog.appendAsFollower(KafkaMetadataLog.scala:110)
        at 
org.apache.kafka.raft.KafkaRaftClient.appendAsFollower(KafkaRaftClient.java:1227)
        at 
org.apache.kafka.raft.KafkaRaftClient.handleFetchResponse(KafkaRaftClient.java:1209)
        at 
org.apache.kafka.raft.KafkaRaftClient.handleResponse(KafkaRaftClient.java:1644)
        at 
org.apache.kafka.raft.KafkaRaftClient.handleInboundMessage(KafkaRaftClient.java:1770)
        at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2355)
        at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:71)
        at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:138){code}
and
{code:java}
 [ERROR] 2025-01-07 18:06:20,121 [kafka-1-raft-io-thread] 
org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault - 
Encountered fatal fault: Unexpected error in raft IO thread"
org.apache.kafka.common.errors.CorruptRecordException: Record size 0 is less 
than the minimum record overhead (14)"{code}
This race also exists with Kafka's ISR based topic partition. In that case the 
replica fetcher catches all CorruptRecordException and InvalidRecordException.
{code:java}
                    } catch {
                      case ime@(_: CorruptRecordException | _: 
InvalidRecordException) =>
                        // we log the error and continue. This ensures two 
things
                        // 1. If there is a corrupt message in a topic 
partition, it does not bring the fetcher thread
                        //    down and cause other topic partition to also lag
                        // 2. If the message is corrupt due to a transient 
state in the log (truncation, partial writes
                        //    can cause this), we simply continue and should 
get fixed in the subsequent fetches
                        error(s"Found invalid messages during fetch for 
partition $topicPartition " +
                          s"offset ${currentFetchState.fetchOffset}", ime)
                        partitionsWithError += topicPartition
 {code}
The KRaft implementation doesn't handle this case:
{code:java}
              } else {
                  Records records = 
FetchResponse.recordsOrFail(partitionResponse);
                  if (records.sizeInBytes() > 0) {
                      appendAsFollower(records);
                  }
                  OptionalLong highWatermark = 
partitionResponse.highWatermark() < 0 ?
                      OptionalLong.empty() :
                      OptionalLong.of(partitionResponse.highWatermark());
                  updateFollowerHighWatermark(state, highWatermark);
              }{code}

  was:
It is possible for a KRaft replica to send corrupted records to the fetching 
replicas in the FETCH response. This is because there is a race between when 
the FETCH response gets generated by the KRaft IO thread and when the network 
thread, or linux kernel, reads the byte position in the log segment.

This race can generated corrupted records if the KRaft replica performed a 
truncation after the FETCH response was created but before the network thread 
read the bytes from the log segment.

I have seen the following errors:
{code:java}
 [ERROR] 2025-01-07 15:04:18,273 [kafka-0-raft-io-thread] 
org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault - 
Encountered fatal fault: Unexpected error in raft IO thread
org.apache.kafka.common.KafkaException: Append failed unexpectedly
        at 
kafka.raft.KafkaMetadataLog.handleAndConvertLogAppendInfo(KafkaMetadataLog.scala:117)
        at 
kafka.raft.KafkaMetadataLog.appendAsFollower(KafkaMetadataLog.scala:110)
        at 
org.apache.kafka.raft.KafkaRaftClient.appendAsFollower(KafkaRaftClient.java:1227)
        at 
org.apache.kafka.raft.KafkaRaftClient.handleFetchResponse(KafkaRaftClient.java:1209)
        at 
org.apache.kafka.raft.KafkaRaftClient.handleResponse(KafkaRaftClient.java:1644)
        at 
org.apache.kafka.raft.KafkaRaftClient.handleInboundMessage(KafkaRaftClient.java:1770)
        at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2355)
        at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:71)
        at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:138){code}
and
{code:java}
 [ERROR] 2025-01-07 18:06:20,121 [kafka-1-raft-io-thread] 
org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault - 
Encountered fatal fault: Unexpected error in raft IO thread"
org.apache.kafka.common.errors.CorruptRecordException: Record size 0 is less 
than the minimum record overhead (14)"{code}
This issue can also happen in Kafka's ISR based topic partition. The replica 
fetcher handles this case by catching CorruptRecordException and 
InvalidRecordException.
{code:java}
                    } catch {
                      case ime@(_: CorruptRecordException | _: 
InvalidRecordException) =>
                        // we log the error and continue. This ensures two 
things
                        // 1. If there is a corrupt message in a topic 
partition, it does not bring the fetcher thread
                        //    down and cause other topic partition to also lag
                        // 2. If the message is corrupt due to a transient 
state in the log (truncation, partial writes
                        //    can cause this), we simply continue and should 
get fixed in the subsequent fetches
                        error(s"Found invalid messages during fetch for 
partition $topicPartition " +
                          s"offset ${currentFetchState.fetchOffset}", ime)
                        partitionsWithError += topicPartition
 {code}
The KRaft implementation doesn't handle this case:
{code:java}
              } else {
                  Records records = 
FetchResponse.recordsOrFail(partitionResponse);
                  if (records.sizeInBytes() > 0) {
                      appendAsFollower(records);
                  }                  OptionalLong highWatermark = 
partitionResponse.highWatermark() < 0 ?
                      OptionalLong.empty() : 
OptionalLong.of(partitionResponse.highWatermark());
                  updateFollowerHighWatermark(state, highWatermark);
              }
 {code}


> KRaft must handle corrupted records in the fetch response
> ---------------------------------------------------------
>
>                 Key: KAFKA-18723
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18723
>             Project: Kafka
>          Issue Type: Bug
>          Components: kraft
>            Reporter: José Armando García Sancio
>            Assignee: José Armando García Sancio
>            Priority: Major
>             Fix For: 3.9.1, 3.8.2, 3.7.3
>
>
> It is possible for a KRaft replica to send corrupted records to the fetching 
> replicas in the FETCH response. This is because there is a race between when 
> the FETCH response gets generated by the KRaft IO thread and when the network 
> thread, or linux kernel, reads the byte position in the log segment.
> This race can generated corrupted records if the KRaft replica performed a 
> truncation after the FETCH response was created but before the network thread 
> read the bytes from the log segment.
> I have seen the following errors:
> {code:java}
>  [ERROR] 2025-01-07 15:04:18,273 [kafka-0-raft-io-thread] 
> org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault - 
> Encountered fatal fault: Unexpected error in raft IO thread
> org.apache.kafka.common.KafkaException: Append failed unexpectedly
>       at 
> kafka.raft.KafkaMetadataLog.handleAndConvertLogAppendInfo(KafkaMetadataLog.scala:117)
>       at 
> kafka.raft.KafkaMetadataLog.appendAsFollower(KafkaMetadataLog.scala:110)
>       at 
> org.apache.kafka.raft.KafkaRaftClient.appendAsFollower(KafkaRaftClient.java:1227)
>       at 
> org.apache.kafka.raft.KafkaRaftClient.handleFetchResponse(KafkaRaftClient.java:1209)
>       at 
> org.apache.kafka.raft.KafkaRaftClient.handleResponse(KafkaRaftClient.java:1644)
>       at 
> org.apache.kafka.raft.KafkaRaftClient.handleInboundMessage(KafkaRaftClient.java:1770)
>       at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2355)
>       at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:71)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:138){code}
> and
> {code:java}
>  [ERROR] 2025-01-07 18:06:20,121 [kafka-1-raft-io-thread] 
> org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault - 
> Encountered fatal fault: Unexpected error in raft IO thread"
> org.apache.kafka.common.errors.CorruptRecordException: Record size 0 is less 
> than the minimum record overhead (14)"{code}
> This race also exists with Kafka's ISR based topic partition. In that case 
> the replica fetcher catches all CorruptRecordException and 
> InvalidRecordException.
> {code:java}
>                     } catch {
>                       case ime@(_: CorruptRecordException | _: 
> InvalidRecordException) =>
>                         // we log the error and continue. This ensures two 
> things
>                         // 1. If there is a corrupt message in a topic 
> partition, it does not bring the fetcher thread
>                         //    down and cause other topic partition to also lag
>                         // 2. If the message is corrupt due to a transient 
> state in the log (truncation, partial writes
>                         //    can cause this), we simply continue and should 
> get fixed in the subsequent fetches
>                         error(s"Found invalid messages during fetch for 
> partition $topicPartition " +
>                           s"offset ${currentFetchState.fetchOffset}", ime)
>                         partitionsWithError += topicPartition
>  {code}
> The KRaft implementation doesn't handle this case:
> {code:java}
>               } else {
>                   Records records = 
> FetchResponse.recordsOrFail(partitionResponse);
>                   if (records.sizeInBytes() > 0) {
>                       appendAsFollower(records);
>                   }
>                   OptionalLong highWatermark = 
> partitionResponse.highWatermark() < 0 ?
>                       OptionalLong.empty() :
>                       OptionalLong.of(partitionResponse.highWatermark());
>                   updateFollowerHighWatermark(state, highWatermark);
>               }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to