mimaison commented on code in PR #12913:
URL: https://github.com/apache/kafka/pull/12913#discussion_r1036885976


##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -490,25 +575,19 @@ class KafkaServer(
 
   private def initZkClient(time: Time): Unit = {
     info(s"Connecting to zookeeper on ${config.zkConnect}")
-
-    val secureAclsEnabled = config.zkEnableSecureAcls
-    val isZkSecurityEnabled = JaasUtils.isZkSaslEnabled() || 
KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig)
-
-    if (secureAclsEnabled && !isZkSecurityEnabled)
-      throw new 
java.lang.SecurityException(s"${KafkaConfig.ZkEnableSecureAclsProp} is true, 
but ZooKeeper client TLS configuration identifying at least 
$KafkaConfig.ZkSslClientEnableProp, $KafkaConfig.ZkClientCnxnSocketProp, and 
$KafkaConfig.ZkSslKeyStoreLocationProp was not present and the " +
-        s"verification of the JAAS login file failed 
${JaasUtils.zkSecuritySysConfigString}")
-
-    _zkClient = KafkaZkClient(config.zkConnect, secureAclsEnabled, 
config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
-      config.zkMaxInFlightRequests, time, name = "Kafka server", 
zkClientConfig = zkClientConfig,
-      createChrootIfNecessary = true)
+    _zkClient = KafkaServer.zkClient("Kafka server", time, config, 
zkClientConfig)
     _zkClient.createTopLevelPaths()
   }
 
   private def getOrGenerateClusterId(zkClient: KafkaZkClient): String = {
     
zkClient.getClusterId.getOrElse(zkClient.createOrGetClusterId(CoreUtils.generateUuidAsBase64()))
   }
 
-  def createBrokerInfo: BrokerInfo = {
+  def createBrokerInfo(): BrokerInfo = {

Review Comment:
   Can this be removed now?



##########
core/src/main/scala/kafka/migration/OffsetTrackingListener.scala:
##########
@@ -0,0 +1,27 @@
+package kafka.migration

Review Comment:
   It's missing the license



##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -685,11 +765,29 @@ class KafkaServer(
       // We request the controller to do a controlled shutdown. On failure, we 
backoff for a configured period
       // of time and try again for a configured number of retries. If all the 
attempt fails, we simply force
       // the shutdown.
-      info("Starting controlled shutdown")
-
       _brokerState = BrokerState.PENDING_CONTROLLED_SHUTDOWN
 
-      val shutdownSucceeded = 
doControlledShutdown(config.controlledShutdownMaxRetries.intValue)
+      val shutdownSucceeded = if (lifecycleManager != null &&
+                                  lifecycleManager.state == 
BrokerState.RUNNING) {
+        // TODO see if RUNNING is a sufficient check here. Probably need to 
check if we have a ZK/KRaft controller

Review Comment:
   Is this tracked in another JIRA?



##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -452,6 +531,12 @@ class KafkaServer(
         dynamicConfigManager = new ZkConfigManager(zkClient, 
dynamicConfigHandlers)
         dynamicConfigManager.startup()
 
+        if (lifecycleManager != null) {

Review Comment:
   Is this tracked in another JIRA?



##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -81,6 +85,19 @@ object KafkaServer {
     clientConfig
   }
 
+  def zkClient(name: String, time: Time, config: KafkaConfig, zkClientConfig: 
ZKClientConfig): KafkaZkClient = {
+    val secureAclsEnabled = config.zkEnableSecureAcls
+    val isZkSecurityEnabled = JaasUtils.isZkSaslEnabled || 
KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig)
+
+    if (secureAclsEnabled && !isZkSecurityEnabled)
+      throw new 
java.lang.SecurityException(s"${KafkaConfig.ZkEnableSecureAclsProp} is true, 
but ZooKeeper client TLS configuration identifying at least 
$KafkaConfig.ZkSslClientEnableProp, $KafkaConfig.ZkClientCnxnSocketProp, and 
$KafkaConfig.ZkSslKeyStoreLocationProp was not present and the " +

Review Comment:
   It's missing curly brackets around the variables so this would not print 
them correctly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to