jsancio commented on code in PR #18304: URL: https://github.com/apache/kafka/pull/18304#discussion_r1925822186
########## checkstyle/import-control.xml: ########## Review Comment: Please undo all of the changes to this file. See my other comments on how to do this. ########## core/src/main/scala/kafka/raft/DefaultExternalKRaftMetrics.scala: ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.raft + +import org.apache.kafka.controller.metrics.ControllerMetadataMetrics +import org.apache.kafka.raft.ExternalKRaftMetrics +import org.apache.kafka.server.metrics.BrokerServerMetrics + +class DefaultExternalKRaftMetrics( + val brokerServerMetrics: BrokerServerMetrics, + val controllerMetadataMetrics: ControllerMetadataMetrics +) extends ExternalKRaftMetrics { + val brokerServerMetricsOpt: Option[BrokerServerMetrics] = Option(brokerServerMetrics) + val controllerMetadataMetricsOpt: Option[ControllerMetadataMetrics] = Option(controllerMetadataMetrics) Review Comment: Did you consider making the object parameters optional? We should avoid using `null` in public interfaces. ```scala class DefaultExternalKRaftMetrics( val brokerServerMetrics: Option[BrokerServerMetrics], val controllerMetadataMetrics: Option[ControllerMetadataMetrics] ) extends ExternalKRaftMetrics { ``` https://www.infoq.com/presentations/Null-References-The-Billion-Dollar-Mistake-Tony-Hoare/ ########## core/src/main/scala/kafka/raft/RaftManager.scala: ########## @@ -119,7 +119,8 @@ class KafkaRaftManager[T]( val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer, InetSocketAddress]], bootstrapServers: JCollection[InetSocketAddress], localListeners: Endpoints, - fatalFaultHandler: FaultHandler + fatalFaultHandler: FaultHandler, + externalKRaftMetrics: ExternalKRaftMetrics Review Comment: I would group this with the `metrics: Metrics` parameter above. ########## metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java: ########## @@ -65,6 +69,8 @@ public final class ControllerMetadataMetrics implements AutoCloseable { private final AtomicInteger metadataErrorCount = new AtomicInteger(0); private Optional<Meter> uncleanLeaderElectionMeter = Optional.empty(); + private final AtomicBoolean ignoredStaticVoters = new AtomicBoolean(false); Review Comment: Extra space if you want to keep it consistent with the rest of the private fields. ########## server/src/main/java/org/apache/kafka/server/metrics/BrokerServerMetrics.java: ########## @@ -55,13 +57,17 @@ public final class BrokerServerMetrics implements AutoCloseable { private final AtomicLong metadataLoadErrorCount = new AtomicLong(0); private final AtomicLong metadataApplyErrorCount = new AtomicLong(0); + private final AtomicBoolean ignoredStaticVoters = new AtomicBoolean(false); + private final Metrics metrics; private final MetricName lastAppliedRecordOffsetName; private final MetricName lastAppliedRecordTimestampName; private final MetricName lastAppliedRecordLagMsName; private final MetricName metadataLoadErrorCountName; private final MetricName metadataApplyErrorCountName; + private final MetricName ignoredStaticVotersName; Review Comment: You can remove the extra newline. ########## server/src/main/java/org/apache/kafka/server/metrics/BrokerServerMetrics.java: ########## @@ -89,12 +95,18 @@ public BrokerServerMetrics(Metrics metrics) { METRIC_GROUP_NAME, "The number of errors encountered by the BrokerMetadataPublisher while applying a new MetadataImage based on the latest MetadataDelta." ); + ignoredStaticVotersName = metrics.metricName( + "ignored-static-voters", + METRIC_GROUP_NAME, + "This value is 1 when the current voter set for the metadata topic partition is being read from the log and 0 when it is read from the static configuration." Review Comment: This description doesn't seem accurate. Is this what I state in the KIP. How about: > 1 if controller.quorum.voters is set but was not used by the broker, 0 otherwise. Please feel free to update the KIP. ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -237,6 +244,9 @@ public void resetRemoveVoterHandlerState( .complete(RaftUtil.removeVoterResponse(error, message)) ); removeVoterHandlerState = state; + kafkaRaftMetrics.updateUncommittedVoterChange( + addVoterHandlerState.isPresent() || removeVoterHandlerState.isPresent() + ); Review Comment: Looks like code duplication. Move this to a private method. ########## raft/src/main/java/org/apache/kafka/raft/QuorumState.java: ########## @@ -95,6 +97,8 @@ public class QuorumState { private final int fetchTimeoutMs; private final LogContext logContext; + private final KafkaRaftMetrics kafkaRaftMetrics; Review Comment: Extra space if you want to keep it consistent with the rest of the private fields. ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -691,6 +704,7 @@ private void updateVoterAndObserverStates(VoterSet lastVoterSet) { for (ReplicaState replicaStateEntry : oldVoterStates.values()) { replicaStateEntry.clearListeners(); observerStates.putIfAbsent(replicaStateEntry.replicaKey, replicaStateEntry); + kafkaRaftMetrics.updateNumObservers(observerStates.size()); Review Comment: This method updates the number of observers N times. How about doing it once after all of the changes to `observerStates` are done? ########## core/src/main/scala/kafka/tools/TestRaftServer.scala: ########## @@ -106,7 +111,11 @@ class TestRaftServer( CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)), QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers), endpoints, - new ProcessTerminatingFaultHandler.Builder().build() + new ProcessTerminatingFaultHandler.Builder().build(), + new DefaultExternalKRaftMetrics( + new BrokerServerMetrics(metrics), + new ControllerMetadataMetrics(Optional.of(new MetricsRegistry())) + ) Review Comment: Since these are optional and this is a test raft server, you can just do ```scala new DefaultExternalKRaftMetrics(None, None)) ``` or this if you keep the old signature. ```scala new DefaultExternalKRaftMetrics(null, null)) ``` ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -649,6 +655,7 @@ private void onBecomeLeader(long currentTimeMs) { resetConnections(); kafkaRaftMetrics.maybeUpdateElectionLatency(currentTimeMs); + kafkaRaftMetrics.addLeaderMetrics(); Review Comment: Did you consider adding the leader metrics when constructing the `LeaderState` object? LeaderState also implement `void close();`, you can use that invocation to remove any leader specific metrics. The `close` method is called anytime the replica transitions out of a state. This should allow to remove all of the `removeLeaderMetrics` litter through this file. ########## server/src/main/java/org/apache/kafka/server/metrics/BrokerServerMetrics.java: ########## @@ -55,13 +57,17 @@ public final class BrokerServerMetrics implements AutoCloseable { private final AtomicLong metadataLoadErrorCount = new AtomicLong(0); private final AtomicLong metadataApplyErrorCount = new AtomicLong(0); + private final AtomicBoolean ignoredStaticVoters = new AtomicBoolean(false); Review Comment: You can remove the extra newline. ########## build.gradle: ########## @@ -2040,8 +2040,12 @@ project(':raft') { testImplementation project(':server-common').sourceSets.test.output testImplementation project(':clients') testImplementation project(':clients').sourceSets.test.output + testImplementation project(':core') + testImplementation project(':server') + testImplementation project(':metadata') Review Comment: Remove these dependencies. The entire raft module, including tests, should only depend on server-common and clients. The raft module should not depend on core, server or metadata. ########## raft/src/main/java/org/apache/kafka/raft/VoterSet.java: ########## @@ -158,6 +158,13 @@ public Set<VoterNode> voterNodes() { return new HashSet<>(voters.values()); } + /** + * Returns size of the voter set. Review Comment: How about "Returns the number of voters in the voter set."? ########## raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java: ########## @@ -87,14 +91,21 @@ public KRaftControlRecordStateMachine( RecordSerde<?> serde, BufferSupplier bufferSupplier, int maxBatchSizeBytes, - LogContext logContext + LogContext logContext, + KafkaRaftMetrics kafkaRaftMetrics, + ExternalKRaftMetrics externalKRaftMetrics ) { this.log = log; this.voterSetHistory = new VoterSetHistory(staticVoterSet, logContext); this.serde = serde; this.bufferSupplier = bufferSupplier; this.maxBatchSizeBytes = maxBatchSizeBytes; this.logger = logContext.logger(this.getClass()); + this.kafkaRaftMetrics = kafkaRaftMetrics; + this.externalKRaftMetrics = externalKRaftMetrics; + this.staticVoterSet = Optional.ofNullable(staticVoterSet); + + this.staticVoterSet.ifPresent(voters -> kafkaRaftMetrics.updateNumVoters(voters.size())); Review Comment: Take a look at `truncateNewEntries`. The number of voters may also get updated because of log end truncation. `truncateOldEntries` should never truncate the latest entry but please take a look at that code to confirm. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org