mimaison commented on code in PR #12913: URL: https://github.com/apache/kafka/pull/12913#discussion_r1036885976
########## core/src/main/scala/kafka/server/KafkaServer.scala: ########## @@ -490,25 +575,19 @@ class KafkaServer( private def initZkClient(time: Time): Unit = { info(s"Connecting to zookeeper on ${config.zkConnect}") - - val secureAclsEnabled = config.zkEnableSecureAcls - val isZkSecurityEnabled = JaasUtils.isZkSaslEnabled() || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig) - - if (secureAclsEnabled && !isZkSecurityEnabled) - throw new java.lang.SecurityException(s"${KafkaConfig.ZkEnableSecureAclsProp} is true, but ZooKeeper client TLS configuration identifying at least $KafkaConfig.ZkSslClientEnableProp, $KafkaConfig.ZkClientCnxnSocketProp, and $KafkaConfig.ZkSslKeyStoreLocationProp was not present and the " + - s"verification of the JAAS login file failed ${JaasUtils.zkSecuritySysConfigString}") - - _zkClient = KafkaZkClient(config.zkConnect, secureAclsEnabled, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, - config.zkMaxInFlightRequests, time, name = "Kafka server", zkClientConfig = zkClientConfig, - createChrootIfNecessary = true) + _zkClient = KafkaServer.zkClient("Kafka server", time, config, zkClientConfig) _zkClient.createTopLevelPaths() } private def getOrGenerateClusterId(zkClient: KafkaZkClient): String = { zkClient.getClusterId.getOrElse(zkClient.createOrGetClusterId(CoreUtils.generateUuidAsBase64())) } - def createBrokerInfo: BrokerInfo = { + def createBrokerInfo(): BrokerInfo = { Review Comment: Can this be removed now? ########## core/src/main/scala/kafka/migration/OffsetTrackingListener.scala: ########## @@ -0,0 +1,27 @@ +package kafka.migration Review Comment: It's missing the license ########## core/src/main/scala/kafka/server/KafkaServer.scala: ########## @@ -685,11 +765,29 @@ class KafkaServer( // We request the controller to do a controlled shutdown. On failure, we backoff for a configured period // of time and try again for a configured number of retries. If all the attempt fails, we simply force // the shutdown. - info("Starting controlled shutdown") - _brokerState = BrokerState.PENDING_CONTROLLED_SHUTDOWN - val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue) + val shutdownSucceeded = if (lifecycleManager != null && + lifecycleManager.state == BrokerState.RUNNING) { + // TODO see if RUNNING is a sufficient check here. Probably need to check if we have a ZK/KRaft controller Review Comment: Is this tracked in another JIRA? ########## core/src/main/scala/kafka/server/KafkaServer.scala: ########## @@ -452,6 +531,12 @@ class KafkaServer( dynamicConfigManager = new ZkConfigManager(zkClient, dynamicConfigHandlers) dynamicConfigManager.startup() + if (lifecycleManager != null) { Review Comment: Is this tracked in another JIRA? ########## core/src/main/scala/kafka/server/KafkaServer.scala: ########## @@ -81,6 +85,19 @@ object KafkaServer { clientConfig } + def zkClient(name: String, time: Time, config: KafkaConfig, zkClientConfig: ZKClientConfig): KafkaZkClient = { + val secureAclsEnabled = config.zkEnableSecureAcls + val isZkSecurityEnabled = JaasUtils.isZkSaslEnabled || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig) + + if (secureAclsEnabled && !isZkSecurityEnabled) + throw new java.lang.SecurityException(s"${KafkaConfig.ZkEnableSecureAclsProp} is true, but ZooKeeper client TLS configuration identifying at least $KafkaConfig.ZkSslClientEnableProp, $KafkaConfig.ZkClientCnxnSocketProp, and $KafkaConfig.ZkSslKeyStoreLocationProp was not present and the " + Review Comment: It's missing curly brackets around the variables so this would not print them correctly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org