[ https://issues.apache.org/jira/browse/KAFKA-13468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Haoze Wu updated KAFKA-13468: ----------------------------- Description: When the Kafka Log class (`core/src/main/scala/kafka/log/Log.scala`) is initialized, it may encounter an IO exception in the locally block, e.g., when the log directory cannot be created due to permission issue or IOException in `initializeLeaderEpochCache`, `initializePartitionMetadata`, etc. {code:java} class Log(...) { // ... locally { // create the log directory if it doesn't exist Files.createDirectories(dir.toPath) initializeLeaderEpochCache() initializePartitionMetadata() val nextOffset = loadSegments() // ... } // ... }{code} We found that the broker encountering the IO exception prints an KafkaApi error log like the following and proceeds. {code:java} [2021-11-17 22:41:30,057] ERROR [KafkaApi-1] Error when handling request: clientId=1, correlationId=1, api=LEADER_AND_ISR, version=5, body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, brokerEpoch=4294967362, type=0, ungroupedPartitionStates=[], topicStates=[LeaderAndIsrTopicState(topicName='gray-2-0', topicId=573bAVHfRQeXApzAKevNIg, partitionStates=[LeaderAndIsrPartitionState(topicName='gray-2-0', partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], isNew=true)]), LeaderAndIsrTopicState(topicName='gray-1-0', topicId=12dW2FxLTiyKmGi41HhdZQ, partitionStates=[LeaderAndIsrPartitionState(topicName='gray-1-0', partitionIndex=1, controllerEpoch=1, leader=3, leaderEpoch=0, isr=[3, 1], zkVersion=0, replicas=[3, 1], addingReplicas=[], removingReplicas=[], isNew=true)]), LeaderAndIsrTopicState(topicName='gray-3-0', topicId=_yvmANyZSoK_PTV0e-nqCA, partitionStates=[LeaderAndIsrPartitionState(topicName='gray-3-0', partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], isNew=true)])], liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, hostName='localhost', port=9791), LeaderAndIsrLiveLeader(brokerId=3, hostName='localhost', port=9793)]) (kafka.server.RequestHandlerHelper) {code} But all the consumers that are consuming data from the affected topics (“gray-2-0”, “gray-1-0”, “gray-3-0”) are not able to proceed. These consumers don’t have any error log related to this issue. They hang for more than 3 minutes. The IOException sometimes affects multiple offset topics: {code:java} [2021-11-18 10:57:41,289] ERROR [KafkaApi-1] Error when handling request: clientId=1, correlationId=11, api=LEADER_AND_ISR, version=5, body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, brokerEpoch=4294967355, type=0, ungroupedPartitionStates=[], topicStates=[LeaderAndIsrTopicState(topicName='__consumer_offsets', topicId=_MiMTCViS76osIyDdxekIg, partitionStates=[LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=15, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=48, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=45, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), ... addingReplicas=[], removingReplicas=[], isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=33, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true)])], liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, hostName='localhost', port=9791)]) (kafka.server.RequestHandlerHelper) {code} *Analysis* The key stacktrace is as follows: {code:java} "java.lang.Thread,run,748", "kafka.server.KafkaRequestHandler,run,74", "kafka.server.KafkaApis,handle,236", "kafka.server.KafkaApis,handleLeaderAndIsrRequest,258", "kafka.server.ReplicaManager,becomeLeaderOrFollower,1411", "kafka.server.ReplicaManager,makeLeaders,1566", "scala.collection.mutable.HashMap,foreachEntry,499", "scala.collection.mutable.HashMap$Node,foreachEntry,633", "kafka.utils.Implicits$MapExtensionMethods$,$anonfun$forKeyValue$1,62", "kafka.server.ReplicaManager,$anonfun$makeLeaders$5,1568", "kafka.cluster.Partition,makeLeader,548", "kafka.cluster.Partition,$anonfun$makeLeader$1,564", "kafka.cluster.Partition,createLogIfNotExists,324", "kafka.cluster.Partition,createLog,344", "kafka.log.LogManager,getOrCreateLog,783", "scala.Option,getOrElse,201", "kafka.log.LogManager,$anonfun$getOrCreateLog$1,830", "kafka.log.Log$,apply,2601", "kafka.log.Log,<init>,323" {code} Basically, the IOException is not be handled by Log but instead gets propagated all the way back to `core/src/main/scala/kafka/server/KafkaApis.scala` {code:java} override def handle(request: RequestChannel.Request): Unit = { try { request.header.apiKey match { // ... case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request) // ... } } catch { case e: FatalExitError => throw e case e: Throwable => requestHelper.handleError(request, e) } finally { // ... } } {code} I also notice the ReplicaManager in `core/src/main/scala/kafka/server/ReplicaManager.scala` has a relevant comment about “unexpected error” with a TODO. {code:java} /* * Make the current broker to become leader for a given set of partitions by: * * 1. Stop fetchers for these partitions * 2. Update the partition metadata in cache * 3. Add these partitions to the leader partitions set * * If an unexpected error is thrown in this function, it will be propagated to KafkaApis where * the error message will be set on each partition since we do not know which partition caused it. Otherwise, * return the set of partitions that are made leader due to this method * * TODO: the above may need to be fixed later */ private def makeLeaders(...): Set[Partition] = { // ... try { // ... partitionStates.forKeyValue { (partition, partitionState) => try { if (partition.makeLeader(partitionState, highWatermarkCheckpoints)) // line 1568 partitionsToMakeLeaders += partition else stateChangeLogger.info(...) } catch { case e: KafkaStorageException => stateChangeLogger.error(...) val dirOpt = getLogDir(partition.topicPartition) error(...) responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR) } } } catch { case e: Throwable => partitionStates.keys.foreach { partition => stateChangeLogger.error(...) } // Re-throw the exception for it to be caught in KafkaApis throw e } // ... } {code} *Fix* To fix this issue, I think we should catch the potential IOException when Log is initialized, and then throw a KafkaStorageException, just like many other IOException handlers in Kafka, e.g., [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120] After applying this fix, the aforementioned symptoms will disappear, i.e., the consumers will not hang and proceed to finish the remaining workload. One question is whether we should also use `logDirFailureChannel.maybeAddOfflineLogDir` to handle the IOException, like [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120] and [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L126-L139] . If so, `logDirFailureChannel.maybeAddOfflineLogDir` would crash the node according to the protocol in [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/ReplicaManager.scala#L268-L277] and [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/ReplicaManager.scala#L327-L332] P.S. This issue holds from Kafka version 2.8.0 to 3.0.0, but currently in the trunk branch, `core/src/main/scala/kafka/log/Log.scala` is renamed to `core/src/main/scala/kafka/log/UnifiedLog.scala` and there are some small code changes. However, the issue still holds, so we are still submitting the pull request for the fix for the trunk branch. And we also propose that the fix should be also applied to version 2.8.0 and 3.0.0, etc, with another pull request. was: When the Kafka Log class (`core/src/main/scala/kafka/log/Log.scala`) is initialized, it may encounter an IO exception in the locally block, e.g., when the log directory cannot be created due to permission issue or IOException in `initializeLeaderEpochCache`, `initializePartitionMetadata`, etc. {code:java} class Log(...) { // ... locally { // create the log directory if it doesn't exist Files.createDirectories(dir.toPath) initializeLeaderEpochCache() initializePartitionMetadata() val nextOffset = loadSegments() // ... } // ... }{code} We found that the broker encountering the IO exception prints an KafkaApi error log like the following and proceeds. {code:java} [2021-11-17 22:41:30,057] ERROR [KafkaApi-1] Error when handling request: clientId=1, correlationId=1, api=LEADER_AND_ISR, version=5, body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, brokerEpoch=4294967362, type=0, ungroupedPartitionStates=[], topicStates=[LeaderAndIsrTopicState(topicName='gray-2-0', topicId=573bAVHfRQeXApzAKevNIg, partitionStates=[LeaderAndIsrPartitionState(topicName='gray-2-0', partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], isNew=true)]), LeaderAndIsrTopicState(topicName='gray-1-0', topicId=12dW2FxLTiyKmGi41HhdZQ, partitionStates=[LeaderAndIsrPartitionState(topicName='gray-1-0', partitionIndex=1, controllerEpoch=1, leader=3, leaderEpoch=0, isr=[3, 1], zkVersion=0, replicas=[3, 1], addingReplicas=[], removingReplicas=[], isNew=true)]), LeaderAndIsrTopicState(topicName='gray-3-0', topicId=_yvmANyZSoK_PTV0e-nqCA, partitionStates=[LeaderAndIsrPartitionState(topicName='gray-3-0', partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], isNew=true)])], liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, hostName='localhost', port=9791), LeaderAndIsrLiveLeader(brokerId=3, hostName='localhost', port=9793)]) (kafka.server.RequestHandlerHelper) {code} But all the consumers that are consuming data from the affected topics (“gray-2-0”, “gray-1-0”, “gray-3-0”) are not able to proceed. These consumers don’t have any error log related to this issue. They hang for more than 3 minutes. The IOException sometimes affects multiple offset topics: {code:java} [2021-11-18 10:57:41,289] ERROR [KafkaApi-1] Error when handling request: clientId=1, correlationId=11, api=LEADER_AND_ISR, version=5, body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, brokerEpoch=4294967355, type=0, ungroupedPartitionStates=[], topicStates=[LeaderAndIsrTopicState(topicName='__consumer_offsets', topicId=_MiMTCViS76osIyDdxekIg, partitionStates=[LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=15, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=48, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=45, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), ... addingReplicas=[], removingReplicas=[], isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=33, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true)])], liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, hostName='localhost', port=9791)]) (kafka.server.RequestHandlerHelper) {code} *Analysis* The key stacktrace is as follows: {code:java} "java.lang.Thread,run,748", "kafka.server.KafkaRequestHandler,run,74", "kafka.server.KafkaApis,handle,236", "kafka.server.KafkaApis,handleLeaderAndIsrRequest,258", "kafka.server.ReplicaManager,becomeLeaderOrFollower,1411", "kafka.server.ReplicaManager,makeLeaders,1566", "scala.collection.mutable.HashMap,foreachEntry,499", "scala.collection.mutable.HashMap$Node,foreachEntry,633", "kafka.utils.Implicits$MapExtensionMethods$,$anonfun$forKeyValue$1,62", "kafka.server.ReplicaManager,$anonfun$makeLeaders$5,1568", "kafka.cluster.Partition,makeLeader,548", "kafka.cluster.Partition,$anonfun$makeLeader$1,564", "kafka.cluster.Partition,createLogIfNotExists,324", "kafka.cluster.Partition,createLog,344", "kafka.log.LogManager,getOrCreateLog,783", "scala.Option,getOrElse,201", "kafka.log.LogManager,$anonfun$getOrCreateLog$1,830", "kafka.log.Log$,apply,2601", "kafka.log.Log,<init>,323" {code} Basically, the IOException is not be handled by Log but instead gets propagated all the way back to `core/src/main/scala/kafka/server/KafkaApis.scala` {code:java} override def handle(request: RequestChannel.Request): Unit = { try { request.header.apiKey match { // ... case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request) // ... } } catch { case e: FatalExitError => throw e case e: Throwable => requestHelper.handleError(request, e) } finally { // ... } } {code} I also notice the ReplicaManager in `core/src/main/scala/kafka/server/ReplicaManager.scala` has a relevant comment about “unexpected error” with a TODO. {code:java} /* * Make the current broker to become leader for a given set of partitions by: * * 1. Stop fetchers for these partitions * 2. Update the partition metadata in cache * 3. Add these partitions to the leader partitions set * * If an unexpected error is thrown in this function, it will be propagated to KafkaApis where * the error message will be set on each partition since we do not know which partition caused it. Otherwise, * return the set of partitions that are made leader due to this method * * TODO: the above may need to be fixed later */ private def makeLeaders(...): Set[Partition] = { // ... try { // ... partitionStates.forKeyValue { (partition, partitionState) => try { if (partition.makeLeader(partitionState, highWatermarkCheckpoints)) // line 1568 partitionsToMakeLeaders += partition else stateChangeLogger.info(...) } catch { case e: KafkaStorageException => stateChangeLogger.error(...) val dirOpt = getLogDir(partition.topicPartition) error(...) responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR) } } } catch { case e: Throwable => partitionStates.keys.foreach { partition => stateChangeLogger.error(...) } // Re-throw the exception for it to be caught in KafkaApis throw e } // ... } {code} *Fix* To fix this issue, I think we should catch the potential IOException when Log is initialized, and then throw a KafkaStorageException, just like many other IOException handlers in Kafka, e.g., [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120] After applying this fix, the aforementioned symptoms will disappear, i.e., the consumers will not hang and proceed to finish the remaining workload. One question is whether we should also use `logDirFailureChannel.maybeAddOfflineLogDir` to handle the IOException, like [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120] and [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L126-L139] . If so, `logDirFailureChannel.maybeAddOfflineLogDir` would crash the node according to the protocol in [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/ReplicaManager.scala#L268-L277] and [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/ReplicaManager.scala#L327-L332] > Consumers may hang because IOException in Log#<init> does not trigger > KafkaStorageException > ------------------------------------------------------------------------------------------- > > Key: KAFKA-13468 > URL: https://issues.apache.org/jira/browse/KAFKA-13468 > Project: Kafka > Issue Type: Bug > Components: log > Affects Versions: 2.8.0 > Reporter: Haoze Wu > Priority: Major > > When the Kafka Log class (`core/src/main/scala/kafka/log/Log.scala`) is > initialized, it may encounter an IO exception in the locally block, e.g., > when the log directory cannot be created due to permission issue or > IOException in `initializeLeaderEpochCache`, `initializePartitionMetadata`, > etc. > {code:java} > class Log(...) { > // ... > locally { > // create the log directory if it doesn't exist > Files.createDirectories(dir.toPath) > initializeLeaderEpochCache() > initializePartitionMetadata() > val nextOffset = loadSegments() > // ... > } > // ... > }{code} > We found that the broker encountering the IO exception prints an KafkaApi > error log like the following and proceeds. > {code:java} > [2021-11-17 22:41:30,057] ERROR [KafkaApi-1] Error when handling request: > clientId=1, correlationId=1, api=LEADER_AND_ISR, version=5, > body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, > brokerEpoch=4294967362, type=0, ungroupedPartitionStates=[], > topicStates=[LeaderAndIsrTopicState(topicName='gray-2-0', > topicId=573bAVHfRQeXApzAKevNIg, > partitionStates=[LeaderAndIsrPartitionState(topicName='gray-2-0', > partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], > zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], > isNew=true)]), LeaderAndIsrTopicState(topicName='gray-1-0', > topicId=12dW2FxLTiyKmGi41HhdZQ, > partitionStates=[LeaderAndIsrPartitionState(topicName='gray-1-0', > partitionIndex=1, controllerEpoch=1, leader=3, leaderEpoch=0, isr=[3, 1], > zkVersion=0, replicas=[3, 1], addingReplicas=[], removingReplicas=[], > isNew=true)]), LeaderAndIsrTopicState(topicName='gray-3-0', > topicId=_yvmANyZSoK_PTV0e-nqCA, > partitionStates=[LeaderAndIsrPartitionState(topicName='gray-3-0', > partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], > zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], > isNew=true)])], liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, > hostName='localhost', port=9791), LeaderAndIsrLiveLeader(brokerId=3, > hostName='localhost', port=9793)]) (kafka.server.RequestHandlerHelper) {code} > But all the consumers that are consuming data from the affected topics > (“gray-2-0”, “gray-1-0”, “gray-3-0”) are not able to proceed. These consumers > don’t have any error log related to this issue. They hang for more than 3 > minutes. > The IOException sometimes affects multiple offset topics: > {code:java} > [2021-11-18 10:57:41,289] ERROR [KafkaApi-1] Error when handling request: > clientId=1, correlationId=11, api=LEADER_AND_ISR, version=5, > body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, > brokerEpoch=4294967355, type=0, ungroupedPartitionStates=[], > topicStates=[LeaderAndIsrTopicState(topicName='__consumer_offsets', > topicId=_MiMTCViS76osIyDdxekIg, > partitionStates=[LeaderAndIsrPartitionState(topicName='__consumer_offsets', > partitionIndex=15, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], > zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], > isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', > partitionIndex=48, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], > zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], > isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', > partitionIndex=45, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], > zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], > isNew=true), ... > addingReplicas=[], removingReplicas=[], isNew=true), > LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=33, > controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, > replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true)])], > liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, hostName='localhost', > port=9791)]) (kafka.server.RequestHandlerHelper) {code} > *Analysis* > The key stacktrace is as follows: > {code:java} > "java.lang.Thread,run,748", > "kafka.server.KafkaRequestHandler,run,74", > "kafka.server.KafkaApis,handle,236", > "kafka.server.KafkaApis,handleLeaderAndIsrRequest,258", > "kafka.server.ReplicaManager,becomeLeaderOrFollower,1411", > "kafka.server.ReplicaManager,makeLeaders,1566", > "scala.collection.mutable.HashMap,foreachEntry,499", > "scala.collection.mutable.HashMap$Node,foreachEntry,633", > "kafka.utils.Implicits$MapExtensionMethods$,$anonfun$forKeyValue$1,62", > "kafka.server.ReplicaManager,$anonfun$makeLeaders$5,1568", > "kafka.cluster.Partition,makeLeader,548", > "kafka.cluster.Partition,$anonfun$makeLeader$1,564", > "kafka.cluster.Partition,createLogIfNotExists,324", > "kafka.cluster.Partition,createLog,344", > "kafka.log.LogManager,getOrCreateLog,783", > "scala.Option,getOrElse,201", > "kafka.log.LogManager,$anonfun$getOrCreateLog$1,830", > "kafka.log.Log$,apply,2601", > "kafka.log.Log,<init>,323" {code} > Basically, the IOException is not be handled by Log but instead gets > propagated all the way back to > `core/src/main/scala/kafka/server/KafkaApis.scala` > {code:java} > override def handle(request: RequestChannel.Request): Unit = { > try { > request.header.apiKey match { > // ... > case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request) > // ... > } > } catch { > case e: FatalExitError => throw e > case e: Throwable => requestHelper.handleError(request, e) > } finally { > // ... > } > } > {code} > I also notice the ReplicaManager in > `core/src/main/scala/kafka/server/ReplicaManager.scala` has a relevant > comment about “unexpected error” with a TODO. > {code:java} > /* > * Make the current broker to become leader for a given set of partitions > by: > * > * 1. Stop fetchers for these partitions > * 2. Update the partition metadata in cache > * 3. Add these partitions to the leader partitions set > * > * If an unexpected error is thrown in this function, it will be propagated > to KafkaApis where > * the error message will be set on each partition since we do not know > which partition caused it. Otherwise, > * return the set of partitions that are made leader due to this method > * > * TODO: the above may need to be fixed later > */ > private def makeLeaders(...): Set[Partition] = { > // ... > try { > // ... > partitionStates.forKeyValue { (partition, partitionState) => > try { > if (partition.makeLeader(partitionState, highWatermarkCheckpoints)) > // line 1568 > partitionsToMakeLeaders += partition > else > stateChangeLogger.info(...) > } catch { > case e: KafkaStorageException => > stateChangeLogger.error(...) > val dirOpt = getLogDir(partition.topicPartition) > error(...) > responseMap.put(partition.topicPartition, > Errors.KAFKA_STORAGE_ERROR) > } > } > } catch { > case e: Throwable => > partitionStates.keys.foreach { partition => > stateChangeLogger.error(...) > } > // Re-throw the exception for it to be caught in KafkaApis > throw e > } > // ... > } {code} > *Fix* > To fix this issue, I think we should catch the potential IOException when Log > is initialized, and then throw a KafkaStorageException, just like many other > IOException handlers in Kafka, e.g., > [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120] > After applying this fix, the aforementioned symptoms will disappear, i.e., > the consumers will not hang and proceed to finish the remaining workload. > One question is whether we should also use > `logDirFailureChannel.maybeAddOfflineLogDir` to handle the IOException, like > [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120] > and > [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L126-L139] > . If so, `logDirFailureChannel.maybeAddOfflineLogDir` would crash the node > according to the protocol in > [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/ReplicaManager.scala#L268-L277] > and > [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/ReplicaManager.scala#L327-L332] > P.S. This issue holds from Kafka version 2.8.0 to 3.0.0, but currently in the > trunk branch, `core/src/main/scala/kafka/log/Log.scala` is renamed to > `core/src/main/scala/kafka/log/UnifiedLog.scala` and there are some small > code changes. However, the issue still holds, so we are still submitting the > pull request for the fix for the trunk branch. And we also propose that the > fix should be also applied to version 2.8.0 and 3.0.0, etc, with another pull > request. -- This message was sent by Atlassian Jira (v8.20.1#820001)