[ https://issues.apache.org/jira/browse/KAFKA-13468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Divij Vaidya updated KAFKA-13468: --------------------------------- Affects Version/s: 3.0.0 4.0.0 > Consumers may hang because IOException in Log#<init> does not trigger > KafkaStorageException > ------------------------------------------------------------------------------------------- > > Key: KAFKA-13468 > URL: https://issues.apache.org/jira/browse/KAFKA-13468 > Project: Kafka > Issue Type: Bug > Components: log > Affects Versions: 2.8.0, 3.0.0, 4.0.0 > Reporter: Haoze Wu > Priority: Major > > When the Kafka Log class (`core/src/main/scala/kafka/log/Log.scala`) is > initialized, it may encounter an IO exception in the locally block, e.g., > when the log directory cannot be created due to permission issue or > IOException in `initializeLeaderEpochCache`, `initializePartitionMetadata`, > etc. > {code:java} > class Log(...) { > // ... > locally { > // create the log directory if it doesn't exist > Files.createDirectories(dir.toPath) > initializeLeaderEpochCache() > initializePartitionMetadata() > val nextOffset = loadSegments() > // ... > } > // ... > }{code} > We found that the broker encountering the IO exception prints an KafkaApi > error log like the following and proceeds. > {code:java} > [2021-11-17 22:41:30,057] ERROR [KafkaApi-1] Error when handling request: > clientId=1, correlationId=1, api=LEADER_AND_ISR, version=5, > body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, > brokerEpoch=4294967362, type=0, ungroupedPartitionStates=[], > topicStates=[LeaderAndIsrTopicState(topicName='gray-2-0', > topicId=573bAVHfRQeXApzAKevNIg, > partitionStates=[LeaderAndIsrPartitionState(topicName='gray-2-0', > partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], > zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], > isNew=true)]), LeaderAndIsrTopicState(topicName='gray-1-0', > topicId=12dW2FxLTiyKmGi41HhdZQ, > partitionStates=[LeaderAndIsrPartitionState(topicName='gray-1-0', > partitionIndex=1, controllerEpoch=1, leader=3, leaderEpoch=0, isr=[3, 1], > zkVersion=0, replicas=[3, 1], addingReplicas=[], removingReplicas=[], > isNew=true)]), LeaderAndIsrTopicState(topicName='gray-3-0', > topicId=_yvmANyZSoK_PTV0e-nqCA, > partitionStates=[LeaderAndIsrPartitionState(topicName='gray-3-0', > partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], > zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], > isNew=true)])], liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, > hostName='localhost', port=9791), LeaderAndIsrLiveLeader(brokerId=3, > hostName='localhost', port=9793)]) (kafka.server.RequestHandlerHelper) {code} > But all the consumers that are consuming data from the affected topics > (“gray-2-0”, “gray-1-0”, “gray-3-0”) are not able to proceed. These consumers > don’t have any error log related to this issue. They hang for more than 3 > minutes. > The IOException sometimes affects multiple offset topics: > {code:java} > [2021-11-18 10:57:41,289] ERROR [KafkaApi-1] Error when handling request: > clientId=1, correlationId=11, api=LEADER_AND_ISR, version=5, > body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, > brokerEpoch=4294967355, type=0, ungroupedPartitionStates=[], > topicStates=[LeaderAndIsrTopicState(topicName='__consumer_offsets', > topicId=_MiMTCViS76osIyDdxekIg, > partitionStates=[LeaderAndIsrPartitionState(topicName='__consumer_offsets', > partitionIndex=15, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], > zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], > isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', > partitionIndex=48, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], > zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], > isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', > partitionIndex=45, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], > zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], > isNew=true), ... > addingReplicas=[], removingReplicas=[], isNew=true), > LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=33, > controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, > replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true)])], > liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, hostName='localhost', > port=9791)]) (kafka.server.RequestHandlerHelper) {code} > *Analysis* > The key stacktrace is as follows: > {code:java} > "java.lang.Thread,run,748", > "kafka.server.KafkaRequestHandler,run,74", > "kafka.server.KafkaApis,handle,236", > "kafka.server.KafkaApis,handleLeaderAndIsrRequest,258", > "kafka.server.ReplicaManager,becomeLeaderOrFollower,1411", > "kafka.server.ReplicaManager,makeLeaders,1566", > "scala.collection.mutable.HashMap,foreachEntry,499", > "scala.collection.mutable.HashMap$Node,foreachEntry,633", > "kafka.utils.Implicits$MapExtensionMethods$,$anonfun$forKeyValue$1,62", > "kafka.server.ReplicaManager,$anonfun$makeLeaders$5,1568", > "kafka.cluster.Partition,makeLeader,548", > "kafka.cluster.Partition,$anonfun$makeLeader$1,564", > "kafka.cluster.Partition,createLogIfNotExists,324", > "kafka.cluster.Partition,createLog,344", > "kafka.log.LogManager,getOrCreateLog,783", > "scala.Option,getOrElse,201", > "kafka.log.LogManager,$anonfun$getOrCreateLog$1,830", > "kafka.log.Log$,apply,2601", > "kafka.log.Log,<init>,323" {code} > Basically, the IOException is not be handled by Log but instead gets > propagated all the way back to > `core/src/main/scala/kafka/server/KafkaApis.scala` > {code:java} > override def handle(request: RequestChannel.Request): Unit = { > try { > request.header.apiKey match { > // ... > case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request) > // ... > } > } catch { > case e: FatalExitError => throw e > case e: Throwable => requestHelper.handleError(request, e) > } finally { > // ... > } > } > {code} > I also notice the ReplicaManager in > `core/src/main/scala/kafka/server/ReplicaManager.scala` has a relevant > comment about “unexpected error” with a TODO. > {code:java} > /* > * Make the current broker to become leader for a given set of partitions > by: > * > * 1. Stop fetchers for these partitions > * 2. Update the partition metadata in cache > * 3. Add these partitions to the leader partitions set > * > * If an unexpected error is thrown in this function, it will be propagated > to KafkaApis where > * the error message will be set on each partition since we do not know > which partition caused it. Otherwise, > * return the set of partitions that are made leader due to this method > * > * TODO: the above may need to be fixed later > */ > private def makeLeaders(...): Set[Partition] = { > // ... > try { > // ... > partitionStates.forKeyValue { (partition, partitionState) => > try { > if (partition.makeLeader(partitionState, highWatermarkCheckpoints)) > // line 1568 > partitionsToMakeLeaders += partition > else > stateChangeLogger.info(...) > } catch { > case e: KafkaStorageException => > stateChangeLogger.error(...) > val dirOpt = getLogDir(partition.topicPartition) > error(...) > responseMap.put(partition.topicPartition, > Errors.KAFKA_STORAGE_ERROR) > } > } > } catch { > case e: Throwable => > partitionStates.keys.foreach { partition => > stateChangeLogger.error(...) > } > // Re-throw the exception for it to be caught in KafkaApis > throw e > } > // ... > } {code} > *Fix* > To fix this issue, I think we should catch the potential IOException when Log > is initialized, and then throw a KafkaStorageException, just like many other > IOException handlers in Kafka, e.g., > [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120] > > After applying this fix, the aforementioned symptoms will disappear, i.e., > the consumers will not hang and proceed to finish the remaining workload. > One question is whether we should also use > `logDirFailureChannel.maybeAddOfflineLogDir` to handle the IOException, like > [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120] > and > [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L126-L139] > . If so, `logDirFailureChannel.maybeAddOfflineLogDir` would crash the node > according to the protocol in > [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/ReplicaManager.scala#L268-L277] > and > [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/ReplicaManager.scala#L327-L332] > P.S. This issue holds from Kafka version 2.8.0 to 3.0.0, but currently in the > trunk branch, `core/src/main/scala/kafka/log/Log.scala` is renamed to > `core/src/main/scala/kafka/log/UnifiedLog.scala` and there are some small > code changes. However, the issue still holds, so we are still submitting the > pull request for the fix for the trunk branch. And we also propose that the > fix should be also applied to version 2.8.0 and 3.0.0, etc, with another pull > request. -- This message was sent by Atlassian Jira (v8.20.10#820010)