[ https://issues.apache.org/jira/browse/KAFKA-13556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Andrei Lakhmanets updated KAFKA-13556: -------------------------------------- Description: Hi team, *Kafka version:* 2.8.1 *Configuration:* 3 kafka brokers in different availability zones and 3 zookeeper brokers in different availability zones. I faced with a bug when kafka cluster loses the controller and if after that restart any none controller broker then it stops processing data. *Context:* The kafka cluster has SASL configuration for connection. {code:java} # Listeners config listeners=SASL_SSL://[::]:9091,SASL_PLAINTEXT://[::]:9092 advertised.listeners=SASL_SSL://---DOMAIN---:9091,SASL_PLAINTEXT://---DOMAIN---:9092sasl.enabled.mechanisms=SCRAM-SHA-512 inter.broker.listener.name=SASL_SSL sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 inter.broker.protocol.version=2.8-IV0 # Keystore config ssl.keystore.location=/etc/kafka/ssl/server.keystore.jks ssl.keystore.password=---PASSWORD--- ssl.key.password=---PASSWORD--- ssl.truststore.location=/etc/kafka/ssl/server.truststore.jks ssl.truststore.password=---PASSWORD--- zookeeper.connect=---ZK_HOST_1---:2181,---ZK_HOST_2---:2181,---ZK_HOST_3---:2181 zookeeper.connection.timeout.ms=6000 {code} Zookeeper doesn't have SASL configuration and uses connection without any authentication. So, when I start kafka brokers I see in logs error about auth failed but then ZK client switches to connection without authentication and all works fine. {code:java} [2021-12-16 14:04:08,451] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:04:08,462] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:04:08,466] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:04:08,467] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:04:08,470] INFO Opening socket connection to server ===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:04:08,478] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:48022, server: ===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:04:08,496] INFO Session establishment complete on server ===ZOOKEEPER_HOST_1===:2181, sessionid = 0x10028c111930125, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:04:08,497] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:SyncConnected type:None path:null (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:04:08,498] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:04:08,753] INFO [feature-zk-node-event-process-thread]: Starting (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread) {code} Then I restart ZK node (or it restarts somehow) which holds connection with kafka controller node (===ZOOKEEPER_HOST_1===) and in logs I see next: {code:java} [2021-12-16 14:08:42,951] INFO Unable to read additional data from server sessionid 0x10028c111930126, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:42,951] INFO Unable to read additional data from server sessionid 0x10028c111930125, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)[2021-12-16 14:08:43,052] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:43,053] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:43,583] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:43,583] INFO Opening socket connection to server ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:43,583] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:43,584] ERROR [ZooKeeperClient ACL authorizer] Auth failed. (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:43,593] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:48560, server: ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:43,808] INFO Unable to read additional data from server sessionid 0x10028c111930126, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:43,910] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:44,019] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,019] INFO Opening socket connection to server ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,019] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:44,019] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:44,022] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:37734, server: ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,023] INFO Unable to read additional data from server sessionid 0x10028c111930125, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,123] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,124] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:44,125] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,586] INFO [ZooKeeperClient ACL authorizer] Reinitializing due to auth failure. (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:44,704] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,704] INFO Opening socket connection to server ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,705] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:44,705] ERROR [ZooKeeperClient ACL authorizer] Auth failed. (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:44,707] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:37738, server: ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,709] INFO Session establishment complete on server ===ZOOKEEPER_HOST_2===:2181, sessionid = 0x10028c111930126, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,710] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:SyncConnected type:None path:null (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:44,798] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,798] INFO Opening socket connection to server ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,798] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:44,798] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:44,807] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:48568, server: ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,824] INFO Session establishment complete on server ===ZOOKEEPER_HOST_3===:2181, sessionid = 0x10028c111930125, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,824] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:SyncConnected type:None path:null (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,824] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:44,824] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:45,020] INFO [ZooKeeperClient Kafka server] Reinitializing due to auth failure. (kafka.zookeeper.ZooKeeperClient) {code} The main points here are next: 1. The log above is from kafka controller node. 2. Connection to ZK was lost _"Received event: WatchedEvent state:Disconnected"_ 3. Connection was established with live ZK nodes _"Opening socket connection to server ===ZOOKEEPER_HOST_3===:2181"_ and _"Received event: WatchedEvent state:SyncConnected"_ 4. During connection process we get _"Auth failed."_ error as it was during starting of brokers. 5. The error "Auth failed" is catched in the code: *kafka.zookeeper.ZooKeeperClient.ZooKeeperClientWatcher.process* and call code: {code:java} if (initialized) scheduleReinitialize("auth-failed", "Reinitializing due to auth failure.", RetryBackoffMs) {code} where RetryBackoffMs is 1000. In a second after "Auth failed" message we see message "{_}Reinitializing due to auth failure.{_}" in the log. The method *ZooKeeperClient.scheduleReinitialize* calls {*}ZooKeeperClient.reinitialize{*}. {code:java} private def reinitialize(): Unit = { // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion // may require additional Zookeeper requests, which will block to acquire the initialization lock stateChangeHandlers.values.foreach(callBeforeInitializingSession _) inWriteLock(initializationLock) { if (!connectionState.isAlive) { zooKeeper.close() info(s"Initializing a new session to $connectString.") // retry forever until ZooKeeper can be instantiated var connected = false while (!connected) { try { zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher, clientConfig) connected = true } catch { case e: Exception => info("Error when recreating ZooKeeper, retrying after a short sleep", e) Thread.sleep(RetryBackoffMs) } } } } stateChangeHandlers.values.foreach(callAfterInitializingSession _) } {code} The code inside "{_}inWriteLock(initializationLock){_}" block is not running, because the connection already was establised with live ZK nodes (in logs I didn't find the message s"{_}Initializing a new session to $connectString.{_}"). But the code "{*}callBeforeInitializingSession{*}" calls "{*}KafkaController.startup.beforeInitializingSession{*}" which fires "{*}ControllerEvent.Expire{*}" event and the code "{*}callAfterInitializingSession"{*} calls "{*}KafkaController.startup.afterInitializingSession{*}" and fires "{*}ControllerEvent.RegisterBrokerAndReelect{*}" event. The event "{*}ControllerEvent.Expire{*}" call "{*}KafkaController.processExpire"{*} method which shutting down the current controller and in logs we see next: {code:java} [2021-12-16 14:08:45,022] DEBUG [Controller id=2] Resigning (kafka.controller.KafkaController) [2021-12-16 14:08:45,022] DEBUG [Controller id=2] Unregister BrokerModifications handler for Set(1, 2, 3) (kafka.controller.KafkaController) [2021-12-16 14:08:45,023] INFO [PartitionStateMachine controllerId=2] Stopped partition state machine (kafka.controller.ZkPartitionStateMachine) [2021-12-16 14:08:45,025] INFO [ReplicaStateMachine controllerId=2] Stopped replica state machine (kafka.controller.ZkReplicaStateMachine) [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutting down (kafka.controller.RequestSendThread) [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Stopped (kafka.controller.RequestSendThread) [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutdown completed (kafka.controller.RequestSendThread) [2021-12-16 14:08:45,031] INFO [RequestSendThread controllerId=2] Shutting down (kafka.controller.RequestSendThread) [2021-12-16 14:08:45,032] INFO [RequestSendThread controllerId=2] Stopped (kafka.controller.RequestSendThread) [2021-12-16 14:08:45,033] INFO [RequestSendThread controllerId=2] Shutdown completed (kafka.controller.RequestSendThread) [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutting down (kafka.controller.RequestSendThread) [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Stopped (kafka.controller.RequestSendThread) [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutdown completed (kafka.controller.RequestSendThread) [2021-12-16 14:08:45,040] INFO [Controller id=2] Resigned (kafka.controller.KafkaController) {code} And then "{*}ControllerEvent.RegisterBrokerAndReelect{*}" event calls method "{*}KafkaController.processRegisterBrokerAndReelect{*}" => "{*}KafkaController.elect{*}". But in the logs there is next: {code:java} [2021-12-16 14:08:45,129] DEBUG [Controller id=2] Broker 2 has been elected as the controller, so stopping the election process. (kafka.controller.KafkaController) {code} There is a part of the method *"KafkaController.elect"* which write the message above {code:java} private def elect(): Unit = { activeControllerId = zkClient.getControllerId.getOrElse(-1) /* * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition, * it's possible that the controller has already been elected when we get here. This check will prevent the following * createEphemeralPath method from getting into an infinite loop if this broker is already the controller. */ if (activeControllerId != -1) { debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.") return } {code} The problem is: The current controller was shutting down because of "{*}KafkaController.processExpire{*}" event but a new one wasn't elected because ZK didn't clean /controller node. So in logs we see that Controller id=2 was resigned and then stopped election because think that it is still a live controller, but all listeners of ZK already shutted down. If after that I restart any non controller brokers (or they were restarted by any reason) then they don't get metadata because there is no the controller in cluster. And if we try to consume data from this brokers we get next errors: {code:java} /opt/kafka/bin/kafka-console-consumer.sh --topic test_topic --from-beginning --bootstrap-server $(hostname):9091 --consumer.config ~/connect.properties WARN [Consumer clientId=consumer-console-consumer-57293-1, groupId=console-consumer-57293] Error while fetching metadata with correlation id 2 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient) WARN [Consumer clientId=consumer-console-consumer-57293-1, groupId=console-consumer-57293] Error while fetching metadata with correlation id 3 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient){code} UPD: Workaround for this bug: # Setup proper SASL connection for ZK # Disable using SASL connection via property "-Dzookeeper.sasl.client=false" was: Hi team, *Kafka version:* 2.8.1 *Configuration:* 3 kafka brokers in different availability zones and 3 zookeeper brokers in different availability zones. I faced with a bug when kafka cluster loses the controller and if after that restart any none controller broker then it stops processing data. *Context:* The kafka cluster has SASL configuration for connection. {code:java} # Listeners config listeners=SASL_SSL://[::]:9091,SASL_PLAINTEXT://[::]:9092 advertised.listeners=SASL_SSL://---DOMAIN---:9091,SASL_PLAINTEXT://---DOMAIN---:9092sasl.enabled.mechanisms=SCRAM-SHA-512 inter.broker.listener.name=SASL_SSL sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 inter.broker.protocol.version=2.8-IV0 # Keystore config ssl.keystore.location=/etc/kafka/ssl/server.keystore.jks ssl.keystore.password=---PASSWORD--- ssl.key.password=---PASSWORD--- ssl.truststore.location=/etc/kafka/ssl/server.truststore.jks ssl.truststore.password=---PASSWORD--- zookeeper.connect=---ZK_HOST_1---:2181,---ZK_HOST_2---:2181,---ZK_HOST_3---:2181 zookeeper.connection.timeout.ms=6000 {code} Zookeeper doesn't have SASL configuration and uses connection without any authentication. So, when I start kafka brokers I see in logs error about auth failed but then ZK client switches to connection without authentication and all works fine. {code:java} [2021-12-16 14:04:08,451] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:04:08,462] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:04:08,466] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:04:08,467] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:04:08,470] INFO Opening socket connection to server ===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:04:08,478] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:48022, server: ===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:04:08,496] INFO Session establishment complete on server ===ZOOKEEPER_HOST_1===:2181, sessionid = 0x10028c111930125, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:04:08,497] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:SyncConnected type:None path:null (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:04:08,498] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:04:08,753] INFO [feature-zk-node-event-process-thread]: Starting (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread) {code} Then I restart ZK node (or it restarts somehow) which holds connection with kafka controller node (===ZOOKEEPER_HOST_1===) and in logs I see next: {code:java} [2021-12-16 14:08:42,951] INFO Unable to read additional data from server sessionid 0x10028c111930126, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:42,951] INFO Unable to read additional data from server sessionid 0x10028c111930125, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)[2021-12-16 14:08:43,052] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:43,053] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:43,583] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:43,583] INFO Opening socket connection to server ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:43,583] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:43,584] ERROR [ZooKeeperClient ACL authorizer] Auth failed. (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:43,593] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:48560, server: ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:43,808] INFO Unable to read additional data from server sessionid 0x10028c111930126, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:43,910] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:44,019] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,019] INFO Opening socket connection to server ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,019] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:44,019] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:44,022] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:37734, server: ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,023] INFO Unable to read additional data from server sessionid 0x10028c111930125, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,123] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,124] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:44,125] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,586] INFO [ZooKeeperClient ACL authorizer] Reinitializing due to auth failure. (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:44,704] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,704] INFO Opening socket connection to server ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,705] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:44,705] ERROR [ZooKeeperClient ACL authorizer] Auth failed. (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:44,707] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:37738, server: ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,709] INFO Session establishment complete on server ===ZOOKEEPER_HOST_2===:2181, sessionid = 0x10028c111930126, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,710] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:SyncConnected type:None path:null (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:44,798] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,798] INFO Opening socket connection to server ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,798] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:44,798] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:44,807] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:48568, server: ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,824] INFO Session establishment complete on server ===ZOOKEEPER_HOST_3===:2181, sessionid = 0x10028c111930125, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn) [2021-12-16 14:08:44,824] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:SyncConnected type:None path:null (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,824] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient) [2021-12-16 14:08:44,824] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:45,020] INFO [ZooKeeperClient Kafka server] Reinitializing due to auth failure. (kafka.zookeeper.ZooKeeperClient) {code} The main points here are next: 1. The log above is from kafka controller node. 2. Connection to ZK was lost _"Received event: WatchedEvent state:Disconnected"_ 3. Connection was established with live ZK nodes _"Opening socket connection to server ===ZOOKEEPER_HOST_3===:2181"_ and _"Received event: WatchedEvent state:SyncConnected"_ 4. During connection process we get _"Auth failed."_ error as it was during starting of brokers. 5. The error "Auth failed" is catched in the code: *kafka.zookeeper.ZooKeeperClient.ZooKeeperClientWatcher.process* and call code: {code:java} if (initialized) scheduleReinitialize("auth-failed", "Reinitializing due to auth failure.", RetryBackoffMs) {code} where RetryBackoffMs is 1000. In a second after "Auth failed" message we see message "{_}Reinitializing due to auth failure.{_}" in the log. The method *ZooKeeperClient.scheduleReinitialize* calls {*}ZooKeeperClient.reinitialize{*}. {code:java} private def reinitialize(): Unit = { // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion // may require additional Zookeeper requests, which will block to acquire the initialization lock stateChangeHandlers.values.foreach(callBeforeInitializingSession _) inWriteLock(initializationLock) { if (!connectionState.isAlive) { zooKeeper.close() info(s"Initializing a new session to $connectString.") // retry forever until ZooKeeper can be instantiated var connected = false while (!connected) { try { zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher, clientConfig) connected = true } catch { case e: Exception => info("Error when recreating ZooKeeper, retrying after a short sleep", e) Thread.sleep(RetryBackoffMs) } } } } stateChangeHandlers.values.foreach(callAfterInitializingSession _) } {code} The code inside "{_}inWriteLock(initializationLock){_}" block is not running, because the connection already was establised with live ZK nodes (in logs I didn't find the message s"{_}Initializing a new session to $connectString.{_}"). But the code "{*}callBeforeInitializingSession{*}" calls "{*}KafkaController.startup.beforeInitializingSession{*}" which fires "{*}ControllerEvent.Expire{*}" event and the code "{*}callAfterInitializingSession"{*} calls "{*}KafkaController.startup.afterInitializingSession{*}" and fires "{*}ControllerEvent.RegisterBrokerAndReelect{*}" event. The event "{*}ControllerEvent.Expire{*}" call "{*}KafkaController.processExpire"{*} method which shutting down the current controller and in logs we see next: {code:java} [2021-12-16 14:08:45,022] DEBUG [Controller id=2] Resigning (kafka.controller.KafkaController) [2021-12-16 14:08:45,022] DEBUG [Controller id=2] Unregister BrokerModifications handler for Set(1, 2, 3) (kafka.controller.KafkaController) [2021-12-16 14:08:45,023] INFO [PartitionStateMachine controllerId=2] Stopped partition state machine (kafka.controller.ZkPartitionStateMachine) [2021-12-16 14:08:45,025] INFO [ReplicaStateMachine controllerId=2] Stopped replica state machine (kafka.controller.ZkReplicaStateMachine) [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutting down (kafka.controller.RequestSendThread) [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Stopped (kafka.controller.RequestSendThread) [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutdown completed (kafka.controller.RequestSendThread) [2021-12-16 14:08:45,031] INFO [RequestSendThread controllerId=2] Shutting down (kafka.controller.RequestSendThread) [2021-12-16 14:08:45,032] INFO [RequestSendThread controllerId=2] Stopped (kafka.controller.RequestSendThread) [2021-12-16 14:08:45,033] INFO [RequestSendThread controllerId=2] Shutdown completed (kafka.controller.RequestSendThread) [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutting down (kafka.controller.RequestSendThread) [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Stopped (kafka.controller.RequestSendThread) [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutdown completed (kafka.controller.RequestSendThread) [2021-12-16 14:08:45,040] INFO [Controller id=2] Resigned (kafka.controller.KafkaController) {code} And then "{*}ControllerEvent.RegisterBrokerAndReelect{*}" event calls method "{*}KafkaController.processRegisterBrokerAndReelect{*}" => "{*}KafkaController.elect{*}". But in the logs there is next: {code:java} [2021-12-16 14:08:45,129] DEBUG [Controller id=2] Broker 2 has been elected as the controller, so stopping the election process. (kafka.controller.KafkaController) {code} There is a part of the method *"KafkaController.elect"* which write the message above {code:java} private def elect(): Unit = { activeControllerId = zkClient.getControllerId.getOrElse(-1) /* * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition, * it's possible that the controller has already been elected when we get here. This check will prevent the following * createEphemeralPath method from getting into an infinite loop if this broker is already the controller. */ if (activeControllerId != -1) { debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.") return } {code} The problem is: The current controller was shutting down because of "{*}KafkaController.processExpire{*}" event but a new one wasn't elected because ZK didn't clean /controller node. So in logs we see that Controller id=2 was resigned and then stopped election because think that it is still a live controller, but all listeners of ZK already shutted down. If after that I restart any non controller brokers (or they were restarted by any reason) then they don't get metadata because there is no the controller in cluster. And if we try to consume data from this brokers we get next errors: {code:java} /opt/kafka/bin/kafka-console-consumer.sh --topic test_topic --from-beginning --bootstrap-server $(hostname):9091 --consumer.config ~/connect.properties WARN [Consumer clientId=consumer-console-consumer-57293-1, groupId=console-consumer-57293] Error while fetching metadata with correlation id 2 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient) WARN [Consumer clientId=consumer-console-consumer-57293-1, groupId=console-consumer-57293] Error while fetching metadata with correlation id 3 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient){code} UPD: Workaround for this bug: # Setup sasl connection for ZK # Disable using sasl connection via property "-Dzookeeper.sasl.client=false" > Kafka cluster loses the controller > ---------------------------------- > > Key: KAFKA-13556 > URL: https://issues.apache.org/jira/browse/KAFKA-13556 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 2.8.1 > Reporter: Andrei Lakhmanets > Priority: Major > > Hi team, > *Kafka version:* 2.8.1 > *Configuration:* 3 kafka brokers in different availability zones and 3 > zookeeper brokers in different availability zones. > I faced with a bug when kafka cluster loses the controller and if after that > restart any none controller broker then it stops processing data. > *Context:* > The kafka cluster has SASL configuration for connection. > {code:java} > # Listeners config > listeners=SASL_SSL://[::]:9091,SASL_PLAINTEXT://[::]:9092 > advertised.listeners=SASL_SSL://---DOMAIN---:9091,SASL_PLAINTEXT://---DOMAIN---:9092sasl.enabled.mechanisms=SCRAM-SHA-512 > inter.broker.listener.name=SASL_SSL > sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 > inter.broker.protocol.version=2.8-IV0 > # Keystore config > ssl.keystore.location=/etc/kafka/ssl/server.keystore.jks > ssl.keystore.password=---PASSWORD--- > ssl.key.password=---PASSWORD--- > ssl.truststore.location=/etc/kafka/ssl/server.truststore.jks > ssl.truststore.password=---PASSWORD--- > zookeeper.connect=---ZK_HOST_1---:2181,---ZK_HOST_2---:2181,---ZK_HOST_3---:2181 > zookeeper.connection.timeout.ms=6000 {code} > Zookeeper doesn't have SASL configuration and uses connection without any > authentication. > So, when I start kafka brokers I see in logs error about auth failed but then > ZK client switches to connection without authentication and all works fine. > {code:java} > [2021-12-16 14:04:08,451] INFO [ZooKeeperClient Kafka server] Waiting until > connected. (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:04:08,462] WARN SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:04:08,466] DEBUG [ZooKeeperClient Kafka server] Received > event: WatchedEvent state:AuthFailed type:None path:null > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:04:08,467] ERROR [ZooKeeperClient Kafka server] Auth failed. > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:04:08,470] INFO Opening socket connection to server > ===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:04:08,478] INFO Socket connection established, initiating > session, client: /===BROKER_2_IP===:48022, server: > ===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:04:08,496] INFO Session establishment complete on server > ===ZOOKEEPER_HOST_1===:2181, sessionid = 0x10028c111930125, negotiated > timeout = 18000 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:04:08,497] DEBUG [ZooKeeperClient Kafka server] Received > event: WatchedEvent state:SyncConnected type:None path:null > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:04:08,498] INFO [ZooKeeperClient Kafka server] Connected. > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:04:08,753] INFO [feature-zk-node-event-process-thread]: > Starting > (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread) > {code} > Then I restart ZK node (or it restarts somehow) which holds connection with > kafka controller node (===ZOOKEEPER_HOST_1===) and in logs I see next: > {code:java} > [2021-12-16 14:08:42,951] INFO Unable to read additional data from server > sessionid 0x10028c111930126, likely server has closed socket, closing socket > connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:42,951] INFO Unable to read additional data from server > sessionid 0x10028c111930125, likely server has closed socket, closing socket > connection and attempting reconnect > (org.apache.zookeeper.ClientCnxn)[2021-12-16 14:08:43,052] DEBUG > [ZooKeeperClient ACL authorizer] Received event: WatchedEvent > state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:43,053] DEBUG [ZooKeeperClient Kafka server] Received > event: WatchedEvent state:Disconnected type:None path:null > (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:43,583] WARN SASL > configuration failed: javax.security.auth.login.LoginException: No JAAS > configuration section named 'Client' was found in specified JAAS > configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue > connection to Zookeeper server without SASL authentication, if Zookeeper > server allows it. (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:43,583] INFO Opening socket connection to server > ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:43,583] DEBUG [ZooKeeperClient ACL authorizer] Received > event: WatchedEvent state:AuthFailed type:None path:null > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:43,584] ERROR [ZooKeeperClient ACL authorizer] Auth failed. > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:43,593] INFO Socket connection established, initiating > session, client: /===BROKER_2_IP===:48560, server: > ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:43,808] INFO Unable to read additional data from server > sessionid 0x10028c111930126, likely server has closed socket, closing socket > connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:43,910] DEBUG [ZooKeeperClient ACL authorizer] Received > event: WatchedEvent state:Disconnected type:None path:null > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:44,019] WARN SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,019] INFO Opening socket connection to server > ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,019] DEBUG [ZooKeeperClient Kafka server] Received > event: WatchedEvent state:AuthFailed type:None path:null > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:44,019] ERROR [ZooKeeperClient Kafka server] Auth failed. > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:44,022] INFO Socket connection established, initiating > session, client: /===BROKER_2_IP===:37734, server: > ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,023] INFO Unable to read additional data from server > sessionid 0x10028c111930125, likely server has closed socket, closing socket > connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,123] DEBUG [ZooKeeperClient Kafka server] Received > event: WatchedEvent state:Disconnected type:None path:null > (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,124] INFO > [ZooKeeperClient Kafka server] Waiting until connected. > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:44,125] INFO [ZooKeeperClient Kafka server] Waiting until > connected. (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,586] INFO > [ZooKeeperClient ACL authorizer] Reinitializing due to auth failure. > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:44,704] WARN SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,704] INFO Opening socket connection to server > ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,705] DEBUG [ZooKeeperClient ACL authorizer] Received > event: WatchedEvent state:AuthFailed type:None path:null > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:44,705] ERROR [ZooKeeperClient ACL authorizer] Auth failed. > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:44,707] INFO Socket connection established, initiating > session, client: /===BROKER_2_IP===:37738, server: > ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,709] INFO Session establishment complete on server > ===ZOOKEEPER_HOST_2===:2181, sessionid = 0x10028c111930126, negotiated > timeout = 18000 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,710] DEBUG [ZooKeeperClient ACL authorizer] Received > event: WatchedEvent state:SyncConnected type:None path:null > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:44,798] WARN SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,798] INFO Opening socket connection to server > ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,798] DEBUG [ZooKeeperClient Kafka server] Received > event: WatchedEvent state:AuthFailed type:None path:null > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:44,798] ERROR [ZooKeeperClient Kafka server] Auth failed. > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:44,807] INFO Socket connection established, initiating > session, client: /===BROKER_2_IP===:48568, server: > ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,824] INFO Session establishment complete on server > ===ZOOKEEPER_HOST_3===:2181, sessionid = 0x10028c111930125, negotiated > timeout = 18000 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,824] DEBUG [ZooKeeperClient Kafka server] Received > event: WatchedEvent state:SyncConnected type:None path:null > (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,824] INFO > [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:44,824] INFO [ZooKeeperClient Kafka server] Connected. > (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:45,020] INFO > [ZooKeeperClient Kafka server] Reinitializing due to auth failure. > (kafka.zookeeper.ZooKeeperClient) {code} > The main points here are next: > 1. The log above is from kafka controller node. > 2. Connection to ZK was lost _"Received event: WatchedEvent > state:Disconnected"_ > 3. Connection was established with live ZK nodes _"Opening socket connection > to server ===ZOOKEEPER_HOST_3===:2181"_ and _"Received event: WatchedEvent > state:SyncConnected"_ > 4. During connection process we get _"Auth failed."_ error as it was during > starting of brokers. > 5. The error "Auth failed" is catched in the code: > *kafka.zookeeper.ZooKeeperClient.ZooKeeperClientWatcher.process* and call > code: > {code:java} > if (initialized) > scheduleReinitialize("auth-failed", "Reinitializing due to auth > failure.", RetryBackoffMs) {code} > where RetryBackoffMs is 1000. > In a second after "Auth failed" message we see message "{_}Reinitializing due > to auth failure.{_}" in the log. > The method *ZooKeeperClient.scheduleReinitialize* calls > {*}ZooKeeperClient.reinitialize{*}. > > {code:java} > private def reinitialize(): Unit = { > // Initialization callbacks are invoked outside of the lock to avoid > deadlock potential since their completion > // may require additional Zookeeper requests, which will block to acquire > the initialization lock > stateChangeHandlers.values.foreach(callBeforeInitializingSession _) > inWriteLock(initializationLock) { > if (!connectionState.isAlive) { > zooKeeper.close() > info(s"Initializing a new session to $connectString.") > // retry forever until ZooKeeper can be instantiated > var connected = false > while (!connected) { > try { > zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, > ZooKeeperClientWatcher, clientConfig) > connected = true > } catch { > case e: Exception => > info("Error when recreating ZooKeeper, retrying after a short > sleep", e) > Thread.sleep(RetryBackoffMs) > } > } > } > } stateChangeHandlers.values.foreach(callAfterInitializingSession _) > } {code} > The code inside "{_}inWriteLock(initializationLock){_}" block is not running, > because the connection already was establised with live ZK nodes (in logs I > didn't find the message s"{_}Initializing a new session to > $connectString.{_}"). > But the code "{*}callBeforeInitializingSession{*}" calls > "{*}KafkaController.startup.beforeInitializingSession{*}" which fires > "{*}ControllerEvent.Expire{*}" event and the code > "{*}callAfterInitializingSession"{*} calls > "{*}KafkaController.startup.afterInitializingSession{*}" and fires > "{*}ControllerEvent.RegisterBrokerAndReelect{*}" event. > The event "{*}ControllerEvent.Expire{*}" call > "{*}KafkaController.processExpire"{*} method which shutting down the current > controller and in logs we see next: > {code:java} > [2021-12-16 14:08:45,022] DEBUG [Controller id=2] Resigning > (kafka.controller.KafkaController) > [2021-12-16 14:08:45,022] DEBUG [Controller id=2] Unregister > BrokerModifications handler for Set(1, 2, 3) > (kafka.controller.KafkaController) > [2021-12-16 14:08:45,023] INFO [PartitionStateMachine controllerId=2] Stopped > partition state machine (kafka.controller.ZkPartitionStateMachine) > [2021-12-16 14:08:45,025] INFO [ReplicaStateMachine controllerId=2] Stopped > replica state machine (kafka.controller.ZkReplicaStateMachine) > [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutting > down (kafka.controller.RequestSendThread) > [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Stopped > (kafka.controller.RequestSendThread) > [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutdown > completed (kafka.controller.RequestSendThread) > [2021-12-16 14:08:45,031] INFO [RequestSendThread controllerId=2] Shutting > down (kafka.controller.RequestSendThread) > [2021-12-16 14:08:45,032] INFO [RequestSendThread controllerId=2] Stopped > (kafka.controller.RequestSendThread) > [2021-12-16 14:08:45,033] INFO [RequestSendThread controllerId=2] Shutdown > completed (kafka.controller.RequestSendThread) > [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutting > down (kafka.controller.RequestSendThread) > [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Stopped > (kafka.controller.RequestSendThread) > [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutdown > completed (kafka.controller.RequestSendThread) > [2021-12-16 14:08:45,040] INFO [Controller id=2] Resigned > (kafka.controller.KafkaController) {code} > And then "{*}ControllerEvent.RegisterBrokerAndReelect{*}" event calls method > "{*}KafkaController.processRegisterBrokerAndReelect{*}" => > "{*}KafkaController.elect{*}". But in the logs there is next: > > {code:java} > [2021-12-16 14:08:45,129] DEBUG [Controller id=2] Broker 2 has been elected > as the controller, so stopping the election process. > (kafka.controller.KafkaController) {code} > There is a part of the method *"KafkaController.elect"* which write the > message above > > > {code:java} > private def elect(): Unit = { > activeControllerId = zkClient.getControllerId.getOrElse(-1) > /* > * We can get here during the initial startup and the handleDeleted ZK > callback. Because of the potential race condition, > * it's possible that the controller has already been elected when we get > here. This check will prevent the following > * createEphemeralPath method from getting into an infinite loop if this > broker is already the controller. > */ > if (activeControllerId != -1) { > debug(s"Broker $activeControllerId has been elected as the controller, > so stopping the election process.") > return > } {code} > The problem is: > The current controller was shutting down because of > "{*}KafkaController.processExpire{*}" event but a new one wasn't elected > because ZK didn't clean /controller node. > So in logs we see that Controller id=2 was resigned and then stopped election > because think that it is still a live controller, but all listeners of ZK > already shutted down. > If after that I restart any non controller brokers (or they were restarted by > any reason) then they don't get metadata because there is no the controller > in cluster. > And if we try to consume data from this brokers we get next errors: > {code:java} > /opt/kafka/bin/kafka-console-consumer.sh --topic test_topic --from-beginning > --bootstrap-server $(hostname):9091 --consumer.config ~/connect.properties > WARN [Consumer clientId=consumer-console-consumer-57293-1, > groupId=console-consumer-57293] Error while fetching metadata with > correlation id 2 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} > (org.apache.kafka.clients.NetworkClient) > WARN [Consumer clientId=consumer-console-consumer-57293-1, > groupId=console-consumer-57293] Error while fetching metadata with > correlation id 3 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} > (org.apache.kafka.clients.NetworkClient){code} > UPD: Workaround for this bug: > # Setup proper SASL connection for ZK > # Disable using SASL connection via property "-Dzookeeper.sasl.client=false" > -- This message was sent by Atlassian Jira (v8.20.1#820001)