jsancio commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r964300580


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -41,8 +43,19 @@ class BrokerMetadataListener(
   val maxBytesBetweenSnapshots: Long,
   val snapshotter: Option[MetadataSnapshotter],
   brokerMetrics: BrokerServerMetrics,
-  metadataLoadingFaultHandler: FaultHandler
+  _metadataLoadingFaultHandler: FaultHandler
 ) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
+
+  private val metadataFaultOccurred = new AtomicBoolean(false)
+  private val metadataLoadingFaultHandler: FaultHandler = new FaultHandler() {
+    override def handleFault(failureMessage: String, cause: Throwable): 
RuntimeException = {
+      if (metadataFaultOccurred.compareAndSet(false, true)) {
+        error("Disabling metadata snapshots until this broker is restarted.")
+      }
+      _metadataLoadingFaultHandler.handleFault(failureMessage, cause)
+    }

Review Comment:
   This abstraction feels strange. For example, how does the operator monitor 
that Kafka has an issue and it is not generating snapshots? I assume that they 
need to monitor the metric for `BrokerSeverMetrics.metadataLoadErrorCount` 
which is updated from `KafkaRaftServer`. The disabling of snapshotting happens 
in `BrokerMetadataListener` which doesn't know about this metric.
   
   I think the solution should make this relation explicit and not have it 
hidden or implemented across multiple layers of abstraction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to