jsancio commented on code in PR #12596: URL: https://github.com/apache/kafka/pull/12596#discussion_r964300580
########## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ########## @@ -41,8 +43,19 @@ class BrokerMetadataListener( val maxBytesBetweenSnapshots: Long, val snapshotter: Option[MetadataSnapshotter], brokerMetrics: BrokerServerMetrics, - metadataLoadingFaultHandler: FaultHandler + _metadataLoadingFaultHandler: FaultHandler ) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup { + + private val metadataFaultOccurred = new AtomicBoolean(false) + private val metadataLoadingFaultHandler: FaultHandler = new FaultHandler() { + override def handleFault(failureMessage: String, cause: Throwable): RuntimeException = { + if (metadataFaultOccurred.compareAndSet(false, true)) { + error("Disabling metadata snapshots until this broker is restarted.") + } + _metadataLoadingFaultHandler.handleFault(failureMessage, cause) + } Review Comment: This abstraction feels strange. For example, how does the operator monitor that Kafka has an issue and it is not generating snapshots? I assume that they need to monitor the metric for `BrokerSeverMetrics.metadataLoadErrorCount` which is updated from `KafkaRaftServer`. The disabling of snapshotting happens in `BrokerMetadataListener` which doesn't know about this metric. I think the solution should make this relation explicit and not have it hidden or implemented across multiple layers of abstraction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org