[ 
https://issues.apache.org/jira/browse/KAFKA-13468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haoze Wu updated KAFKA-13468:
-----------------------------
    Description: 
When the Kafka Log class (`core/src/main/scala/kafka/log/Log.scala`) is 
initialized, it may encounter an IO exception in the locally block, e.g., when 
the log directory cannot be created due to permission issue or IOException in  
`initializeLeaderEpochCache`, `initializePartitionMetadata`, etc.
{code:java}
class Log(...) {
  // ...
  locally {
    // create the log directory if it doesn't exist
    Files.createDirectories(dir.toPath)

    initializeLeaderEpochCache()
    initializePartitionMetadata()

    val nextOffset = loadSegments()
    // ...
  }
  // ...
}{code}
We found that the broker encountering the IO exception prints an KafkaApi error 
log like the following and proceeds.
{code:java}
[2021-11-17 22:41:30,057] ERROR [KafkaApi-1] Error when handling request: 
clientId=1, correlationId=1, api=LEADER_AND_ISR, version=5, 
body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, 
brokerEpoch=4294967362, type=0, ungroupedPartitionStates=[], 
topicStates=[LeaderAndIsrTopicState(topicName='gray-2-0', 
topicId=573bAVHfRQeXApzAKevNIg, 
partitionStates=[LeaderAndIsrPartitionState(topicName='gray-2-0', 
partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], 
zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], 
isNew=true)]), LeaderAndIsrTopicState(topicName='gray-1-0', 
topicId=12dW2FxLTiyKmGi41HhdZQ, 
partitionStates=[LeaderAndIsrPartitionState(topicName='gray-1-0', 
partitionIndex=1, controllerEpoch=1, leader=3, leaderEpoch=0, isr=[3, 1], 
zkVersion=0, replicas=[3, 1], addingReplicas=[], removingReplicas=[], 
isNew=true)]), LeaderAndIsrTopicState(topicName='gray-3-0', 
topicId=_yvmANyZSoK_PTV0e-nqCA, 
partitionStates=[LeaderAndIsrPartitionState(topicName='gray-3-0', 
partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], 
zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], 
isNew=true)])], liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, 
hostName='localhost', port=9791), LeaderAndIsrLiveLeader(brokerId=3, 
hostName='localhost', port=9793)]) (kafka.server.RequestHandlerHelper) {code}
But all the consumers that are consuming data from the affected topics 
(“gray-2-0”, “gray-1-0”, “gray-3-0”) are not able to proceed. These consumers 
don’t have any error log related to this issue. They hang for more than 3 
minutes.

The IOException sometimes affects multiple offset topics:
{code:java}
[2021-11-18 10:57:41,289] ERROR [KafkaApi-1] Error when handling request: 
clientId=1, correlationId=11, api=LEADER_AND_ISR, version=5, 
body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, 
brokerEpoch=4294967355, type=0, ungroupedPartitionStates=[], 
topicStates=[LeaderAndIsrTopicState(topicName='__consumer_offsets', 
topicId=_MiMTCViS76osIyDdxekIg, 
partitionStates=[LeaderAndIsrPartitionState(topicName='__consumer_offsets', 
partitionIndex=15, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], 
zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), 
LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=48, 
controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], 
addingReplicas=[], removingReplicas=[], isNew=true), 
LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=45, 
controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], 
addingReplicas=[], removingReplicas=[], isNew=true), ...
addingReplicas=[], removingReplicas=[], isNew=true), 
LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=33, 
controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], 
addingReplicas=[], removingReplicas=[], isNew=true)])], 
liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, hostName='localhost', 
port=9791)]) (kafka.server.RequestHandlerHelper) {code}
*Analysis*

The key stacktrace is as follows:
{code:java}
"java.lang.Thread,run,748",
"kafka.server.KafkaRequestHandler,run,74",
"kafka.server.KafkaApis,handle,236",
"kafka.server.KafkaApis,handleLeaderAndIsrRequest,258",
"kafka.server.ReplicaManager,becomeLeaderOrFollower,1411",
"kafka.server.ReplicaManager,makeLeaders,1566",
"scala.collection.mutable.HashMap,foreachEntry,499",
"scala.collection.mutable.HashMap$Node,foreachEntry,633",
"kafka.utils.Implicits$MapExtensionMethods$,$anonfun$forKeyValue$1,62",
"kafka.server.ReplicaManager,$anonfun$makeLeaders$5,1568",
"kafka.cluster.Partition,makeLeader,548",
"kafka.cluster.Partition,$anonfun$makeLeader$1,564",
"kafka.cluster.Partition,createLogIfNotExists,324",
"kafka.cluster.Partition,createLog,344",
"kafka.log.LogManager,getOrCreateLog,783",
"scala.Option,getOrElse,201",
"kafka.log.LogManager,$anonfun$getOrCreateLog$1,830",
"kafka.log.Log$,apply,2601",
"kafka.log.Log,<init>,323" {code}
Basically, the IOException is not be handled by Log but instead gets propagated 
all the way back to `core/src/main/scala/kafka/server/KafkaApis.scala`
{code:java}
  override def handle(request: RequestChannel.Request): Unit = {
    try {
      request.header.apiKey match {
        // ...
        case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
        // ...
      }
    } catch {
      case e: FatalExitError => throw e
      case e: Throwable => requestHelper.handleError(request, e)
    } finally {
      // ...
    }
  }
{code}
I also notice the ReplicaManager in 
`core/src/main/scala/kafka/server/ReplicaManager.scala` has a relevant comment 
about “unexpected error” with a TODO.
{code:java}
  /*
   * Make the current broker to become leader for a given set of partitions by:
   *
   * 1. Stop fetchers for these partitions
   * 2. Update the partition metadata in cache
   * 3. Add these partitions to the leader partitions set
   *
   * If an unexpected error is thrown in this function, it will be propagated 
to KafkaApis where
   * the error message will be set on each partition since we do not know which 
partition caused it. Otherwise,
   * return the set of partitions that are made leader due to this method
   *
   *  TODO: the above may need to be fixed later
   */
  private def makeLeaders(...): Set[Partition] = {
    // ...
    try {
      // ...
      partitionStates.forKeyValue { (partition, partitionState) =>
        try {
          if (partition.makeLeader(partitionState, highWatermarkCheckpoints)) 
// line 1568
            partitionsToMakeLeaders += partition
          else
            stateChangeLogger.info(...)
        } catch {
          case e: KafkaStorageException =>
            stateChangeLogger.error(...)
            val dirOpt = getLogDir(partition.topicPartition)
            error(...)
            responseMap.put(partition.topicPartition, 
Errors.KAFKA_STORAGE_ERROR)
        }
      }
    } catch {
      case e: Throwable =>
        partitionStates.keys.foreach { partition =>
          stateChangeLogger.error(...)
        }
        // Re-throw the exception for it to be caught in KafkaApis
        throw e
    }
    // ...
  } {code}
*Fix*

To fix this issue, I think we should catch the potential IOException when Log 
is initialized, and then throw a KafkaStorageException, just like many other 
IOException handlers in Kafka, e.g., 
[https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120]

After applying this fix, the aforementioned symptoms will disappear, i.e., the 
consumers will not hang and proceed to finish the remaining workload.

One question is whether we should also use 
`logDirFailureChannel.maybeAddOfflineLogDir` to handle the IOException, like 
[https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120]
 and 
[https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L126-L139]
 . If so, `logDirFailureChannel.maybeAddOfflineLogDir` would crash the node 
according to the protocol in 
[https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/ReplicaManager.scala#L268-L277]
 and 
[https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/ReplicaManager.scala#L327-L332]

P.S. This issue holds from Kafka version 2.8.0 to 3.0.0, but currently in the 
trunk branch, `core/src/main/scala/kafka/log/Log.scala` is renamed to 
`core/src/main/scala/kafka/log/UnifiedLog.scala` and there are some small code 
changes. However, the issue still holds, so we are still submitting the pull 
request for the fix for the trunk branch. And we also propose that the fix 
should be also applied to version 2.8.0 and 3.0.0, etc, with another pull 
request.

  was:
When the Kafka Log class (`core/src/main/scala/kafka/log/Log.scala`) is 
initialized, it may encounter an IO exception in the locally block, e.g., when 
the log directory cannot be created due to permission issue or IOException in  
`initializeLeaderEpochCache`, `initializePartitionMetadata`, etc.
{code:java}
class Log(...) {
  // ...
  locally {
    // create the log directory if it doesn't exist
    Files.createDirectories(dir.toPath)

    initializeLeaderEpochCache()
    initializePartitionMetadata()

    val nextOffset = loadSegments()
    // ...
  }
  // ...
}{code}
We found that the broker encountering the IO exception prints an KafkaApi error 
log like the following and proceeds.
{code:java}
[2021-11-17 22:41:30,057] ERROR [KafkaApi-1] Error when handling request: 
clientId=1, correlationId=1, api=LEADER_AND_ISR, version=5, 
body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, 
brokerEpoch=4294967362, type=0, ungroupedPartitionStates=[], 
topicStates=[LeaderAndIsrTopicState(topicName='gray-2-0', 
topicId=573bAVHfRQeXApzAKevNIg, 
partitionStates=[LeaderAndIsrPartitionState(topicName='gray-2-0', 
partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], 
zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], 
isNew=true)]), LeaderAndIsrTopicState(topicName='gray-1-0', 
topicId=12dW2FxLTiyKmGi41HhdZQ, 
partitionStates=[LeaderAndIsrPartitionState(topicName='gray-1-0', 
partitionIndex=1, controllerEpoch=1, leader=3, leaderEpoch=0, isr=[3, 1], 
zkVersion=0, replicas=[3, 1], addingReplicas=[], removingReplicas=[], 
isNew=true)]), LeaderAndIsrTopicState(topicName='gray-3-0', 
topicId=_yvmANyZSoK_PTV0e-nqCA, 
partitionStates=[LeaderAndIsrPartitionState(topicName='gray-3-0', 
partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], 
zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], 
isNew=true)])], liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, 
hostName='localhost', port=9791), LeaderAndIsrLiveLeader(brokerId=3, 
hostName='localhost', port=9793)]) (kafka.server.RequestHandlerHelper) {code}
But all the consumers that are consuming data from the affected topics 
(“gray-2-0”, “gray-1-0”, “gray-3-0”) are not able to proceed. These consumers 
don’t have any error log related to this issue. They hang for more than 3 
minutes.

The IOException sometimes affects multiple offset topics:
{code:java}
[2021-11-18 10:57:41,289] ERROR [KafkaApi-1] Error when handling request: 
clientId=1, correlationId=11, api=LEADER_AND_ISR, version=5, 
body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, 
brokerEpoch=4294967355, type=0, ungroupedPartitionStates=[], 
topicStates=[LeaderAndIsrTopicState(topicName='__consumer_offsets', 
topicId=_MiMTCViS76osIyDdxekIg, 
partitionStates=[LeaderAndIsrPartitionState(topicName='__consumer_offsets', 
partitionIndex=15, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], 
zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), 
LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=48, 
controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], 
addingReplicas=[], removingReplicas=[], isNew=true), 
LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=45, 
controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], 
addingReplicas=[], removingReplicas=[], isNew=true), ...
addingReplicas=[], removingReplicas=[], isNew=true), 
LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=33, 
controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], 
addingReplicas=[], removingReplicas=[], isNew=true)])], 
liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, hostName='localhost', 
port=9791)]) (kafka.server.RequestHandlerHelper) {code}
*Analysis*

The key stacktrace is as follows:
{code:java}
"java.lang.Thread,run,748",
"kafka.server.KafkaRequestHandler,run,74",
"kafka.server.KafkaApis,handle,236",
"kafka.server.KafkaApis,handleLeaderAndIsrRequest,258",
"kafka.server.ReplicaManager,becomeLeaderOrFollower,1411",
"kafka.server.ReplicaManager,makeLeaders,1566",
"scala.collection.mutable.HashMap,foreachEntry,499",
"scala.collection.mutable.HashMap$Node,foreachEntry,633",
"kafka.utils.Implicits$MapExtensionMethods$,$anonfun$forKeyValue$1,62",
"kafka.server.ReplicaManager,$anonfun$makeLeaders$5,1568",
"kafka.cluster.Partition,makeLeader,548",
"kafka.cluster.Partition,$anonfun$makeLeader$1,564",
"kafka.cluster.Partition,createLogIfNotExists,324",
"kafka.cluster.Partition,createLog,344",
"kafka.log.LogManager,getOrCreateLog,783",
"scala.Option,getOrElse,201",
"kafka.log.LogManager,$anonfun$getOrCreateLog$1,830",
"kafka.log.Log$,apply,2601",
"kafka.log.Log,<init>,323" {code}
Basically, the IOException is not be handled by Log but instead gets propagated 
all the way back to `core/src/main/scala/kafka/server/KafkaApis.scala`
{code:java}
  override def handle(request: RequestChannel.Request): Unit = {
    try {
      request.header.apiKey match {
        // ...
        case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
        // ...
      }
    } catch {
      case e: FatalExitError => throw e
      case e: Throwable => requestHelper.handleError(request, e)
    } finally {
      // ...
    }
  }
{code}
I also notice the ReplicaManager in 
`core/src/main/scala/kafka/server/ReplicaManager.scala` has a relevant comment 
about “unexpected error” with a TODO.
{code:java}
  /*
   * Make the current broker to become leader for a given set of partitions by:
   *
   * 1. Stop fetchers for these partitions
   * 2. Update the partition metadata in cache
   * 3. Add these partitions to the leader partitions set
   *
   * If an unexpected error is thrown in this function, it will be propagated 
to KafkaApis where
   * the error message will be set on each partition since we do not know which 
partition caused it. Otherwise,
   * return the set of partitions that are made leader due to this method
   *
   *  TODO: the above may need to be fixed later
   */
  private def makeLeaders(...): Set[Partition] = {
    // ...
    try {
      // ...
      partitionStates.forKeyValue { (partition, partitionState) =>
        try {
          if (partition.makeLeader(partitionState, highWatermarkCheckpoints)) 
// line 1568
            partitionsToMakeLeaders += partition
          else
            stateChangeLogger.info(...)
        } catch {
          case e: KafkaStorageException =>
            stateChangeLogger.error(...)
            val dirOpt = getLogDir(partition.topicPartition)
            error(...)
            responseMap.put(partition.topicPartition, 
Errors.KAFKA_STORAGE_ERROR)
        }
      }
    } catch {
      case e: Throwable =>
        partitionStates.keys.foreach { partition =>
          stateChangeLogger.error(...)
        }
        // Re-throw the exception for it to be caught in KafkaApis
        throw e
    }
    // ...
  } {code}
*Fix*

To fix this issue, I think we should catch the potential IOException when Log 
is initialized, and then throw a KafkaStorageException, just like many other 
IOException handlers in Kafka, e.g., 
[https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120]

After applying this fix, the aforementioned symptoms will disappear, i.e., the 
consumers will not hang and proceed to finish the remaining workload.

One question is whether we should also use 
`logDirFailureChannel.maybeAddOfflineLogDir` to handle the IOException, like 
[https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120]
 and 
[https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L126-L139]
 . If so, `logDirFailureChannel.maybeAddOfflineLogDir` would crash the node 
according to the protocol in 
[https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/ReplicaManager.scala#L268-L277]
 and 
[https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/ReplicaManager.scala#L327-L332]


> Consumers may hang because IOException in Log#<init> does not trigger 
> KafkaStorageException
> -------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13468
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13468
>             Project: Kafka
>          Issue Type: Bug
>          Components: log
>    Affects Versions: 2.8.0
>            Reporter: Haoze Wu
>            Priority: Major
>
> When the Kafka Log class (`core/src/main/scala/kafka/log/Log.scala`) is 
> initialized, it may encounter an IO exception in the locally block, e.g., 
> when the log directory cannot be created due to permission issue or 
> IOException in  `initializeLeaderEpochCache`, `initializePartitionMetadata`, 
> etc.
> {code:java}
> class Log(...) {
>   // ...
>   locally {
>     // create the log directory if it doesn't exist
>     Files.createDirectories(dir.toPath)
>     initializeLeaderEpochCache()
>     initializePartitionMetadata()
>     val nextOffset = loadSegments()
>     // ...
>   }
>   // ...
> }{code}
> We found that the broker encountering the IO exception prints an KafkaApi 
> error log like the following and proceeds.
> {code:java}
> [2021-11-17 22:41:30,057] ERROR [KafkaApi-1] Error when handling request: 
> clientId=1, correlationId=1, api=LEADER_AND_ISR, version=5, 
> body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, 
> brokerEpoch=4294967362, type=0, ungroupedPartitionStates=[], 
> topicStates=[LeaderAndIsrTopicState(topicName='gray-2-0', 
> topicId=573bAVHfRQeXApzAKevNIg, 
> partitionStates=[LeaderAndIsrPartitionState(topicName='gray-2-0', 
> partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], 
> zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], 
> isNew=true)]), LeaderAndIsrTopicState(topicName='gray-1-0', 
> topicId=12dW2FxLTiyKmGi41HhdZQ, 
> partitionStates=[LeaderAndIsrPartitionState(topicName='gray-1-0', 
> partitionIndex=1, controllerEpoch=1, leader=3, leaderEpoch=0, isr=[3, 1], 
> zkVersion=0, replicas=[3, 1], addingReplicas=[], removingReplicas=[], 
> isNew=true)]), LeaderAndIsrTopicState(topicName='gray-3-0', 
> topicId=_yvmANyZSoK_PTV0e-nqCA, 
> partitionStates=[LeaderAndIsrPartitionState(topicName='gray-3-0', 
> partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], 
> zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], 
> isNew=true)])], liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, 
> hostName='localhost', port=9791), LeaderAndIsrLiveLeader(brokerId=3, 
> hostName='localhost', port=9793)]) (kafka.server.RequestHandlerHelper) {code}
> But all the consumers that are consuming data from the affected topics 
> (“gray-2-0”, “gray-1-0”, “gray-3-0”) are not able to proceed. These consumers 
> don’t have any error log related to this issue. They hang for more than 3 
> minutes.
> The IOException sometimes affects multiple offset topics:
> {code:java}
> [2021-11-18 10:57:41,289] ERROR [KafkaApi-1] Error when handling request: 
> clientId=1, correlationId=11, api=LEADER_AND_ISR, version=5, 
> body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, 
> brokerEpoch=4294967355, type=0, ungroupedPartitionStates=[], 
> topicStates=[LeaderAndIsrTopicState(topicName='__consumer_offsets', 
> topicId=_MiMTCViS76osIyDdxekIg, 
> partitionStates=[LeaderAndIsrPartitionState(topicName='__consumer_offsets', 
> partitionIndex=15, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], 
> zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], 
> isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', 
> partitionIndex=48, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], 
> zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], 
> isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', 
> partitionIndex=45, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], 
> zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], 
> isNew=true), ...
> addingReplicas=[], removingReplicas=[], isNew=true), 
> LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=33, 
> controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, 
> replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true)])], 
> liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, hostName='localhost', 
> port=9791)]) (kafka.server.RequestHandlerHelper) {code}
> *Analysis*
> The key stacktrace is as follows:
> {code:java}
> "java.lang.Thread,run,748",
> "kafka.server.KafkaRequestHandler,run,74",
> "kafka.server.KafkaApis,handle,236",
> "kafka.server.KafkaApis,handleLeaderAndIsrRequest,258",
> "kafka.server.ReplicaManager,becomeLeaderOrFollower,1411",
> "kafka.server.ReplicaManager,makeLeaders,1566",
> "scala.collection.mutable.HashMap,foreachEntry,499",
> "scala.collection.mutable.HashMap$Node,foreachEntry,633",
> "kafka.utils.Implicits$MapExtensionMethods$,$anonfun$forKeyValue$1,62",
> "kafka.server.ReplicaManager,$anonfun$makeLeaders$5,1568",
> "kafka.cluster.Partition,makeLeader,548",
> "kafka.cluster.Partition,$anonfun$makeLeader$1,564",
> "kafka.cluster.Partition,createLogIfNotExists,324",
> "kafka.cluster.Partition,createLog,344",
> "kafka.log.LogManager,getOrCreateLog,783",
> "scala.Option,getOrElse,201",
> "kafka.log.LogManager,$anonfun$getOrCreateLog$1,830",
> "kafka.log.Log$,apply,2601",
> "kafka.log.Log,<init>,323" {code}
> Basically, the IOException is not be handled by Log but instead gets 
> propagated all the way back to 
> `core/src/main/scala/kafka/server/KafkaApis.scala`
> {code:java}
>   override def handle(request: RequestChannel.Request): Unit = {
>     try {
>       request.header.apiKey match {
>         // ...
>         case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
>         // ...
>       }
>     } catch {
>       case e: FatalExitError => throw e
>       case e: Throwable => requestHelper.handleError(request, e)
>     } finally {
>       // ...
>     }
>   }
> {code}
> I also notice the ReplicaManager in 
> `core/src/main/scala/kafka/server/ReplicaManager.scala` has a relevant 
> comment about “unexpected error” with a TODO.
> {code:java}
>   /*
>    * Make the current broker to become leader for a given set of partitions 
> by:
>    *
>    * 1. Stop fetchers for these partitions
>    * 2. Update the partition metadata in cache
>    * 3. Add these partitions to the leader partitions set
>    *
>    * If an unexpected error is thrown in this function, it will be propagated 
> to KafkaApis where
>    * the error message will be set on each partition since we do not know 
> which partition caused it. Otherwise,
>    * return the set of partitions that are made leader due to this method
>    *
>    *  TODO: the above may need to be fixed later
>    */
>   private def makeLeaders(...): Set[Partition] = {
>     // ...
>     try {
>       // ...
>       partitionStates.forKeyValue { (partition, partitionState) =>
>         try {
>           if (partition.makeLeader(partitionState, highWatermarkCheckpoints)) 
> // line 1568
>             partitionsToMakeLeaders += partition
>           else
>             stateChangeLogger.info(...)
>         } catch {
>           case e: KafkaStorageException =>
>             stateChangeLogger.error(...)
>             val dirOpt = getLogDir(partition.topicPartition)
>             error(...)
>             responseMap.put(partition.topicPartition, 
> Errors.KAFKA_STORAGE_ERROR)
>         }
>       }
>     } catch {
>       case e: Throwable =>
>         partitionStates.keys.foreach { partition =>
>           stateChangeLogger.error(...)
>         }
>         // Re-throw the exception for it to be caught in KafkaApis
>         throw e
>     }
>     // ...
>   } {code}
> *Fix*
> To fix this issue, I think we should catch the potential IOException when Log 
> is initialized, and then throw a KafkaStorageException, just like many other 
> IOException handlers in Kafka, e.g., 
> [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120]
> After applying this fix, the aforementioned symptoms will disappear, i.e., 
> the consumers will not hang and proceed to finish the remaining workload.
> One question is whether we should also use 
> `logDirFailureChannel.maybeAddOfflineLogDir` to handle the IOException, like 
> [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120]
>  and 
> [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L126-L139]
>  . If so, `logDirFailureChannel.maybeAddOfflineLogDir` would crash the node 
> according to the protocol in 
> [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/ReplicaManager.scala#L268-L277]
>  and 
> [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/ReplicaManager.scala#L327-L332]
> P.S. This issue holds from Kafka version 2.8.0 to 3.0.0, but currently in the 
> trunk branch, `core/src/main/scala/kafka/log/Log.scala` is renamed to 
> `core/src/main/scala/kafka/log/UnifiedLog.scala` and there are some small 
> code changes. However, the issue still holds, so we are still submitting the 
> pull request for the fix for the trunk branch. And we also propose that the 
> fix should be also applied to version 2.8.0 and 3.0.0, etc, with another pull 
> request.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to