[ 
https://issues.apache.org/jira/browse/KAFKA-13468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-13468:
---------------------------------
    Affects Version/s: 3.0.0
                       4.0.0

> Consumers may hang because IOException in Log#<init> does not trigger 
> KafkaStorageException
> -------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13468
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13468
>             Project: Kafka
>          Issue Type: Bug
>          Components: log
>    Affects Versions: 2.8.0, 3.0.0, 4.0.0
>            Reporter: Haoze Wu
>            Priority: Major
>
> When the Kafka Log class (`core/src/main/scala/kafka/log/Log.scala`) is 
> initialized, it may encounter an IO exception in the locally block, e.g., 
> when the log directory cannot be created due to permission issue or 
> IOException in  `initializeLeaderEpochCache`, `initializePartitionMetadata`, 
> etc.
> {code:java}
> class Log(...) {
>   // ...
>   locally {
>     // create the log directory if it doesn't exist
>     Files.createDirectories(dir.toPath)
>     initializeLeaderEpochCache()
>     initializePartitionMetadata()
>     val nextOffset = loadSegments()
>     // ...
>   }
>   // ...
> }{code}
> We found that the broker encountering the IO exception prints an KafkaApi 
> error log like the following and proceeds.
> {code:java}
> [2021-11-17 22:41:30,057] ERROR [KafkaApi-1] Error when handling request: 
> clientId=1, correlationId=1, api=LEADER_AND_ISR, version=5, 
> body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, 
> brokerEpoch=4294967362, type=0, ungroupedPartitionStates=[], 
> topicStates=[LeaderAndIsrTopicState(topicName='gray-2-0', 
> topicId=573bAVHfRQeXApzAKevNIg, 
> partitionStates=[LeaderAndIsrPartitionState(topicName='gray-2-0', 
> partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], 
> zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], 
> isNew=true)]), LeaderAndIsrTopicState(topicName='gray-1-0', 
> topicId=12dW2FxLTiyKmGi41HhdZQ, 
> partitionStates=[LeaderAndIsrPartitionState(topicName='gray-1-0', 
> partitionIndex=1, controllerEpoch=1, leader=3, leaderEpoch=0, isr=[3, 1], 
> zkVersion=0, replicas=[3, 1], addingReplicas=[], removingReplicas=[], 
> isNew=true)]), LeaderAndIsrTopicState(topicName='gray-3-0', 
> topicId=_yvmANyZSoK_PTV0e-nqCA, 
> partitionStates=[LeaderAndIsrPartitionState(topicName='gray-3-0', 
> partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], 
> zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], 
> isNew=true)])], liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, 
> hostName='localhost', port=9791), LeaderAndIsrLiveLeader(brokerId=3, 
> hostName='localhost', port=9793)]) (kafka.server.RequestHandlerHelper) {code}
> But all the consumers that are consuming data from the affected topics 
> (“gray-2-0”, “gray-1-0”, “gray-3-0”) are not able to proceed. These consumers 
> don’t have any error log related to this issue. They hang for more than 3 
> minutes.
> The IOException sometimes affects multiple offset topics:
> {code:java}
> [2021-11-18 10:57:41,289] ERROR [KafkaApi-1] Error when handling request: 
> clientId=1, correlationId=11, api=LEADER_AND_ISR, version=5, 
> body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, 
> brokerEpoch=4294967355, type=0, ungroupedPartitionStates=[], 
> topicStates=[LeaderAndIsrTopicState(topicName='__consumer_offsets', 
> topicId=_MiMTCViS76osIyDdxekIg, 
> partitionStates=[LeaderAndIsrPartitionState(topicName='__consumer_offsets', 
> partitionIndex=15, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], 
> zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], 
> isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', 
> partitionIndex=48, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], 
> zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], 
> isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', 
> partitionIndex=45, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], 
> zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], 
> isNew=true), ...
> addingReplicas=[], removingReplicas=[], isNew=true), 
> LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=33, 
> controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, 
> replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true)])], 
> liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, hostName='localhost', 
> port=9791)]) (kafka.server.RequestHandlerHelper) {code}
> *Analysis*
> The key stacktrace is as follows:
> {code:java}
> "java.lang.Thread,run,748",
> "kafka.server.KafkaRequestHandler,run,74",
> "kafka.server.KafkaApis,handle,236",
> "kafka.server.KafkaApis,handleLeaderAndIsrRequest,258",
> "kafka.server.ReplicaManager,becomeLeaderOrFollower,1411",
> "kafka.server.ReplicaManager,makeLeaders,1566",
> "scala.collection.mutable.HashMap,foreachEntry,499",
> "scala.collection.mutable.HashMap$Node,foreachEntry,633",
> "kafka.utils.Implicits$MapExtensionMethods$,$anonfun$forKeyValue$1,62",
> "kafka.server.ReplicaManager,$anonfun$makeLeaders$5,1568",
> "kafka.cluster.Partition,makeLeader,548",
> "kafka.cluster.Partition,$anonfun$makeLeader$1,564",
> "kafka.cluster.Partition,createLogIfNotExists,324",
> "kafka.cluster.Partition,createLog,344",
> "kafka.log.LogManager,getOrCreateLog,783",
> "scala.Option,getOrElse,201",
> "kafka.log.LogManager,$anonfun$getOrCreateLog$1,830",
> "kafka.log.Log$,apply,2601",
> "kafka.log.Log,<init>,323" {code}
> Basically, the IOException is not be handled by Log but instead gets 
> propagated all the way back to 
> `core/src/main/scala/kafka/server/KafkaApis.scala`
> {code:java}
>   override def handle(request: RequestChannel.Request): Unit = {
>     try {
>       request.header.apiKey match {
>         // ...
>         case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
>         // ...
>       }
>     } catch {
>       case e: FatalExitError => throw e
>       case e: Throwable => requestHelper.handleError(request, e)
>     } finally {
>       // ...
>     }
>   }
> {code}
> I also notice the ReplicaManager in 
> `core/src/main/scala/kafka/server/ReplicaManager.scala` has a relevant 
> comment about “unexpected error” with a TODO.
> {code:java}
>   /*
>    * Make the current broker to become leader for a given set of partitions 
> by:
>    *
>    * 1. Stop fetchers for these partitions
>    * 2. Update the partition metadata in cache
>    * 3. Add these partitions to the leader partitions set
>    *
>    * If an unexpected error is thrown in this function, it will be propagated 
> to KafkaApis where
>    * the error message will be set on each partition since we do not know 
> which partition caused it. Otherwise,
>    * return the set of partitions that are made leader due to this method
>    *
>    *  TODO: the above may need to be fixed later
>    */
>   private def makeLeaders(...): Set[Partition] = {
>     // ...
>     try {
>       // ...
>       partitionStates.forKeyValue { (partition, partitionState) =>
>         try {
>           if (partition.makeLeader(partitionState, highWatermarkCheckpoints)) 
> // line 1568
>             partitionsToMakeLeaders += partition
>           else
>             stateChangeLogger.info(...)
>         } catch {
>           case e: KafkaStorageException =>
>             stateChangeLogger.error(...)
>             val dirOpt = getLogDir(partition.topicPartition)
>             error(...)
>             responseMap.put(partition.topicPartition, 
> Errors.KAFKA_STORAGE_ERROR)
>         }
>       }
>     } catch {
>       case e: Throwable =>
>         partitionStates.keys.foreach { partition =>
>           stateChangeLogger.error(...)
>         }
>         // Re-throw the exception for it to be caught in KafkaApis
>         throw e
>     }
>     // ...
>   } {code}
> *Fix*
> To fix this issue, I think we should catch the potential IOException when Log 
> is initialized, and then throw a KafkaStorageException, just like many other 
> IOException handlers in Kafka, e.g., 
> [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120]
>  
> After applying this fix, the aforementioned symptoms will disappear, i.e., 
> the consumers will not hang and proceed to finish the remaining workload.
> One question is whether we should also use 
> `logDirFailureChannel.maybeAddOfflineLogDir` to handle the IOException, like 
> [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120]
>  and 
> [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L126-L139]
>  . If so, `logDirFailureChannel.maybeAddOfflineLogDir` would crash the node 
> according to the protocol in 
> [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/ReplicaManager.scala#L268-L277]
>  and 
> [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/ReplicaManager.scala#L327-L332]
> P.S. This issue holds from Kafka version 2.8.0 to 3.0.0, but currently in the 
> trunk branch, `core/src/main/scala/kafka/log/Log.scala` is renamed to 
> `core/src/main/scala/kafka/log/UnifiedLog.scala` and there are some small 
> code changes. However, the issue still holds, so we are still submitting the 
> pull request for the fix for the trunk branch. And we also propose that the 
> fix should be also applied to version 2.8.0 and 3.0.0, etc, with another pull 
> request.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to