jsancio commented on code in PR #18304:
URL: https://github.com/apache/kafka/pull/18304#discussion_r1925822186


##########
checkstyle/import-control.xml:
##########


Review Comment:
   Please undo all of the changes to this file. See my other comments on how to 
do this.



##########
core/src/main/scala/kafka/raft/DefaultExternalKRaftMetrics.scala:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.raft
+
+import org.apache.kafka.controller.metrics.ControllerMetadataMetrics
+import org.apache.kafka.raft.ExternalKRaftMetrics
+import org.apache.kafka.server.metrics.BrokerServerMetrics
+
+class DefaultExternalKRaftMetrics(
+  val brokerServerMetrics: BrokerServerMetrics,
+  val controllerMetadataMetrics: ControllerMetadataMetrics
+) extends ExternalKRaftMetrics {
+  val brokerServerMetricsOpt: Option[BrokerServerMetrics] = 
Option(brokerServerMetrics)
+  val controllerMetadataMetricsOpt: Option[ControllerMetadataMetrics] = 
Option(controllerMetadataMetrics)

Review Comment:
   Did you consider making the object parameters optional? We should avoid 
using `null` in public interfaces. 
   ```scala
   class DefaultExternalKRaftMetrics(
     val brokerServerMetrics: Option[BrokerServerMetrics],
     val controllerMetadataMetrics: Option[ControllerMetadataMetrics]
   ) extends ExternalKRaftMetrics {
   ```
   
   
https://www.infoq.com/presentations/Null-References-The-Billion-Dollar-Mistake-Tony-Hoare/



##########
core/src/main/scala/kafka/raft/RaftManager.scala:
##########
@@ -119,7 +119,8 @@ class KafkaRaftManager[T](
   val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer, 
InetSocketAddress]],
   bootstrapServers: JCollection[InetSocketAddress],
   localListeners: Endpoints,
-  fatalFaultHandler: FaultHandler
+  fatalFaultHandler: FaultHandler,
+  externalKRaftMetrics: ExternalKRaftMetrics

Review Comment:
   I would group this with the `metrics: Metrics` parameter above.



##########
metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java:
##########
@@ -65,6 +69,8 @@ public final class ControllerMetadataMetrics implements 
AutoCloseable {
     private final AtomicInteger metadataErrorCount = new AtomicInteger(0);
     private Optional<Meter> uncleanLeaderElectionMeter = Optional.empty();
 
+    private final AtomicBoolean ignoredStaticVoters = new AtomicBoolean(false);

Review Comment:
   Extra space if you want to keep it consistent with the rest of the private 
fields.



##########
server/src/main/java/org/apache/kafka/server/metrics/BrokerServerMetrics.java:
##########
@@ -55,13 +57,17 @@ public final class BrokerServerMetrics implements 
AutoCloseable {
     private final AtomicLong metadataLoadErrorCount = new AtomicLong(0);
     private final AtomicLong metadataApplyErrorCount = new AtomicLong(0);
 
+    private final AtomicBoolean ignoredStaticVoters = new AtomicBoolean(false);
+
     private final Metrics metrics;
     private final MetricName lastAppliedRecordOffsetName;
     private final MetricName lastAppliedRecordTimestampName;
     private final MetricName lastAppliedRecordLagMsName;
     private final MetricName metadataLoadErrorCountName;
     private final MetricName metadataApplyErrorCountName;
 
+    private final MetricName ignoredStaticVotersName;

Review Comment:
   You can remove the extra newline.



##########
server/src/main/java/org/apache/kafka/server/metrics/BrokerServerMetrics.java:
##########
@@ -89,12 +95,18 @@ public BrokerServerMetrics(Metrics metrics) {
                 METRIC_GROUP_NAME,
                 "The number of errors encountered by the 
BrokerMetadataPublisher while applying a new MetadataImage based on the latest 
MetadataDelta."
         );
+        ignoredStaticVotersName = metrics.metricName(
+            "ignored-static-voters",
+            METRIC_GROUP_NAME,
+            "This value is 1 when the current voter set for the metadata topic 
partition is being read from the log and 0 when it is read from the static 
configuration."

Review Comment:
   This description doesn't seem accurate. Is this what I state in the KIP. How 
about:
   
   > 1 if controller.quorum.voters is set but was not used by the broker, 0 
otherwise.
   
   Please feel free to update the KIP.



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -237,6 +244,9 @@ public void resetRemoveVoterHandlerState(
                 .complete(RaftUtil.removeVoterResponse(error, message))
         );
         removeVoterHandlerState = state;
+        kafkaRaftMetrics.updateUncommittedVoterChange(
+            addVoterHandlerState.isPresent() || 
removeVoterHandlerState.isPresent()
+        );

Review Comment:
   Looks like code duplication. Move this to a private method.



##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -95,6 +97,8 @@ public class QuorumState {
     private final int fetchTimeoutMs;
     private final LogContext logContext;
 
+    private final KafkaRaftMetrics kafkaRaftMetrics;

Review Comment:
   Extra space if you want to keep it consistent with the rest of the private 
fields.



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -691,6 +704,7 @@ private void updateVoterAndObserverStates(VoterSet 
lastVoterSet) {
         for (ReplicaState replicaStateEntry : oldVoterStates.values()) {
             replicaStateEntry.clearListeners();
             observerStates.putIfAbsent(replicaStateEntry.replicaKey, 
replicaStateEntry);
+            kafkaRaftMetrics.updateNumObservers(observerStates.size());

Review Comment:
   This method updates the number of observers N times. How about doing it once 
after all of the changes to `observerStates` are done?



##########
core/src/main/scala/kafka/tools/TestRaftServer.scala:
##########
@@ -106,7 +111,11 @@ class TestRaftServer(
       
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
       QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
       endpoints,
-      new ProcessTerminatingFaultHandler.Builder().build()
+      new ProcessTerminatingFaultHandler.Builder().build(),
+      new DefaultExternalKRaftMetrics(
+        new BrokerServerMetrics(metrics),
+        new ControllerMetadataMetrics(Optional.of(new MetricsRegistry()))
+      )

Review Comment:
   Since these are optional and this is a test raft server, you can just do
   ```scala
         new DefaultExternalKRaftMetrics(None, None))
   ```
   or this if you keep the old signature.
   ```scala
         new DefaultExternalKRaftMetrics(null, null))
   ```



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -649,6 +655,7 @@ private void onBecomeLeader(long currentTimeMs) {
 
         resetConnections();
         kafkaRaftMetrics.maybeUpdateElectionLatency(currentTimeMs);
+        kafkaRaftMetrics.addLeaderMetrics();

Review Comment:
   Did you consider adding the leader metrics when constructing the 
`LeaderState` object?
   
   LeaderState also implement `void close();`, you can use that invocation to 
remove any leader specific metrics. The `close` method is called anytime the 
replica transitions out of a state. This should allow to remove all of the 
`removeLeaderMetrics` litter through this file.



##########
server/src/main/java/org/apache/kafka/server/metrics/BrokerServerMetrics.java:
##########
@@ -55,13 +57,17 @@ public final class BrokerServerMetrics implements 
AutoCloseable {
     private final AtomicLong metadataLoadErrorCount = new AtomicLong(0);
     private final AtomicLong metadataApplyErrorCount = new AtomicLong(0);
 
+    private final AtomicBoolean ignoredStaticVoters = new AtomicBoolean(false);

Review Comment:
   You can remove the extra newline.



##########
build.gradle:
##########
@@ -2040,8 +2040,12 @@ project(':raft') {
     testImplementation project(':server-common').sourceSets.test.output
     testImplementation project(':clients')
     testImplementation project(':clients').sourceSets.test.output
+    testImplementation project(':core')
+    testImplementation project(':server')
+    testImplementation project(':metadata')

Review Comment:
   Remove these dependencies. The entire raft module, including tests, should 
only depend on server-common and clients. The raft module should not depend on 
core, server or metadata.



##########
raft/src/main/java/org/apache/kafka/raft/VoterSet.java:
##########
@@ -158,6 +158,13 @@ public Set<VoterNode> voterNodes() {
         return new HashSet<>(voters.values());
     }
 
+    /**
+     * Returns size of the voter set.

Review Comment:
   How about "Returns the number of voters in the voter set."?



##########
raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java:
##########
@@ -87,14 +91,21 @@ public KRaftControlRecordStateMachine(
         RecordSerde<?> serde,
         BufferSupplier bufferSupplier,
         int maxBatchSizeBytes,
-        LogContext logContext
+        LogContext logContext,
+        KafkaRaftMetrics kafkaRaftMetrics,
+        ExternalKRaftMetrics externalKRaftMetrics
     ) {
         this.log = log;
         this.voterSetHistory = new VoterSetHistory(staticVoterSet, logContext);
         this.serde = serde;
         this.bufferSupplier = bufferSupplier;
         this.maxBatchSizeBytes = maxBatchSizeBytes;
         this.logger = logContext.logger(this.getClass());
+        this.kafkaRaftMetrics = kafkaRaftMetrics;
+        this.externalKRaftMetrics = externalKRaftMetrics;
+        this.staticVoterSet = Optional.ofNullable(staticVoterSet);
+
+        this.staticVoterSet.ifPresent(voters -> 
kafkaRaftMetrics.updateNumVoters(voters.size()));

Review Comment:
   Take a look at `truncateNewEntries`. The number of voters may also get 
updated because of log end truncation.
   
   `truncateOldEntries` should never truncate the latest entry but please take 
a look at that code to confirm.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to