hachikuji commented on code in PR #11969:
URL: https://github.com/apache/kafka/pull/11969#discussion_r868426504


##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1486,13 +1486,16 @@ class KafkaConfigTest {
     assertEquals("3", originals.get(KafkaConfig.NodeIdProp))
   }
 
+  val kraftProps = new Properties()

Review Comment:
   nit: it would be a little nicer to make this a def. We don't need the 
properties after they are initialized.



##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1509,4 +1512,29 @@ class KafkaConfigTest {
     
assertNotNull(config.getLong(KafkaConfig.SaslOAuthBearerJwksEndpointRetryBackoffMsProp))
     
assertNotNull(config.getLong(KafkaConfig.SaslOAuthBearerJwksEndpointRetryBackoffMaxMsProp))
   }
+
+  @Test

Review Comment:
   May be useful testing that the default uses the controller listeners?
   
   As a side comment, I wonder if the default of the controller listeners 
should only apply when the standard authorizer is in use? Or perhaps a warning 
of some kind if we have a non-standard authorizer and `early.start.listeners` 
is not set.



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##########
@@ -78,6 +77,24 @@ public AclMutator aclMutatorOrException() {
         return aclMutator;
     }
 
+    @Override
+    public synchronized void completeInitialLoad() {
+        data = data.copyWithNewLoadingComplete(true);
+        data.log.info("Completed initial ACL load process.");
+        initialLoadFuture.complete(null);
+    }
+
+    // Visible for testing

Review Comment:
   nit: can probably make it default access?



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -921,12 +925,35 @@ private void appendRaftEvent(String name, Runnable 
runnable) {
                 if (this != metaLogListener) {
                     log.debug("Ignoring {} raft event from an old 
registration", name);
                 } else {
-                    runnable.run();
+                    try {
+                        runnable.run();
+                    } finally {
+                        maybeCompleteAuthorizerInitialLoad();
+                    }
                 }
             });
         }
     }
 
+    private void maybeCompleteAuthorizerInitialLoad() {
+        if (!needToCompleteAuthorizerLoad) return;
+        OptionalLong highWatermark = raftClient.highWatermark();
+        if (highWatermark.isPresent()) {
+            if (lastCommittedOffset + 1 >= highWatermark.getAsLong()) {
+                log.debug("maybeCompleteAuthorizerInitialLoad: completing 
authorizer " +

Review Comment:
   I think it would be reasonable to have an INFO message about this event. It 
only happens once and I suspect it will be useful to know when it completed.



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -251,6 +273,9 @@ public AuthorizationResult authorize(
         if (superUsers.contains(principal.toString())) {
             rule = SuperUserRule.INSTANCE;
         } else {
+            if (!loadingComplete) {

Review Comment:
   nit: maybe pull this up one level?
   ```
   } else if (!loadingComplete) {
     throw new AuthorizerNotReadyException();
   } else {
   ...



##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -725,7 +727,11 @@ object KafkaConfig {
 
   /************* Authorizer Configuration ***********/
   val AuthorizerClassNameDoc = s"The fully qualified name of a class that 
implements <code>${classOf[Authorizer].getName}</code>" +
-  " interface, which is used by the broker for authorization."
+    " interface, which is used by the broker for authorization."
+  val EarlyStartListenersDoc = "A comma-separated list of listener names which 
should be started before any non-early start listeners. " +

Review Comment:
   nit: the first sentence is seems circular. I think it would be helpful to 
mention the authorizer use case. Also, can we say something about the default?



##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -1864,6 +1780,18 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
       sensor
     }
   }
+
+  /**
+   * Close `channel` and decrement the connection count.
+   */
+  def closeChannel(listenerName: ListenerName, channel: SocketChannel): Unit = 
{
+    if (channel != null) {
+      debug(s"Closing connection from 
${channel.socket.getRemoteSocketAddress}")
+      dec(listenerName, channel.socket.getInetAddress)
+      closeSocket(channel, this)

Review Comment:
   Yes, I'm not disputing the need to register the close event with 
`ConnectionQuotas`. But I think it is a strange inversion of responsibility to 
have it close the socket as well. The `Processor` and `Acceptor` classes are 
the ones that own the socket lifecycle, while the `ConnectionQuotas` instance 
is just a parameter. The original implementation was more intuitive. 
   
   It is admittedly a small thing, but I do think the logging issue matters. 
When we are debugging events in system test failures, for example, grepping the 
logger is a common strategy. It is surprising to need to include the connection 
quota object to see all of the close events.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to