[ 
https://issues.apache.org/jira/browse/KAFKA-13556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrei Lakhmanets updated KAFKA-13556:
--------------------------------------
    Description: 
Hi team,

*Kafka version:* 2.8.1
*Configuration:* 3 kafka brokers in different availability zones and 3 
zookeeper brokers in different availability zones.

I faced with a bug when kafka cluster loses the controller and if after that 
restart any none controller broker then it stops processing data.

*Context:*
The kafka cluster has SASL configuration for connection.
{code:java}
# Listeners config
listeners=SASL_SSL://[::]:9091,SASL_PLAINTEXT://[::]:9092
advertised.listeners=SASL_SSL://---DOMAIN---:9091,SASL_PLAINTEXT://---DOMAIN---:9092sasl.enabled.mechanisms=SCRAM-SHA-512
inter.broker.listener.name=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
inter.broker.protocol.version=2.8-IV0
# Keystore config
ssl.keystore.location=/etc/kafka/ssl/server.keystore.jks
ssl.keystore.password=---PASSWORD---
ssl.key.password=---PASSWORD---
ssl.truststore.location=/etc/kafka/ssl/server.truststore.jks
ssl.truststore.password=---PASSWORD--- 

zookeeper.connect=---ZK_HOST_1---:2181,---ZK_HOST_2---:2181,---ZK_HOST_3---:2181
zookeeper.connection.timeout.ms=6000 {code}
Zookeeper doesn't have SASL configuration and uses connection without any 
authentication.

So, when I start kafka brokers I see in logs error about auth failed but then 
ZK client switches to connection without authentication and all works fine.
{code:java}
[2021-12-16 14:04:08,451] INFO [ZooKeeperClient Kafka server] Waiting until 
connected. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:04:08,462] WARN SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper 
server without SASL authentication, if Zookeeper server allows it. 
(org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:04:08,466] DEBUG [ZooKeeperClient Kafka server] Received event: 
WatchedEvent state:AuthFailed type:None path:null 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:04:08,467] ERROR [ZooKeeperClient Kafka server] Auth failed. 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:04:08,470] INFO Opening socket connection to server 
===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:04:08,478] INFO Socket connection established, initiating 
session, client: /===BROKER_2_IP===:48022, server: ===ZOOKEEPER_HOST_1===:2181 
(org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:04:08,496] INFO Session establishment complete on server 
===ZOOKEEPER_HOST_1===:2181, sessionid = 0x10028c111930125, negotiated timeout 
= 18000 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:04:08,497] DEBUG [ZooKeeperClient Kafka server] Received event: 
WatchedEvent state:SyncConnected type:None path:null 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:04:08,498] INFO [ZooKeeperClient Kafka server] Connected. 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:04:08,753] INFO [feature-zk-node-event-process-thread]: Starting 
(kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread) 
{code}
Then I restart ZK node (or it restarts somehow) which holds connection with 
kafka controller node (===ZOOKEEPER_HOST_1===) and in logs I see next:
{code:java}
[2021-12-16 14:08:42,951] INFO Unable to read additional data from server 
sessionid 0x10028c111930126, likely server has closed socket, closing socket 
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:42,951] INFO Unable to read additional data from server 
sessionid 0x10028c111930125, likely server has closed socket, closing socket 
connection and attempting reconnect 
(org.apache.zookeeper.ClientCnxn)[2021-12-16 14:08:43,052] DEBUG 
[ZooKeeperClient ACL authorizer] Received event: WatchedEvent 
state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:43,053] DEBUG [ZooKeeperClient Kafka server] Received event: 
WatchedEvent state:Disconnected type:None path:null 
(kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:43,583] WARN SASL 
configuration failed: javax.security.auth.login.LoginException: No JAAS 
configuration section named 'Client' was found in specified JAAS configuration 
file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to 
Zookeeper server without SASL authentication, if Zookeeper server allows it. 
(org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:43,583] INFO Opening socket connection to server 
===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:43,583] DEBUG [ZooKeeperClient ACL authorizer] Received 
event: WatchedEvent state:AuthFailed type:None path:null 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:43,584] ERROR [ZooKeeperClient ACL authorizer] Auth failed. 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:43,593] INFO Socket connection established, initiating 
session, client: /===BROKER_2_IP===:48560, server: ===ZOOKEEPER_HOST_3===:2181 
(org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:43,808] INFO Unable to read additional data from server 
sessionid 0x10028c111930126, likely server has closed socket, closing socket 
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:43,910] DEBUG [ZooKeeperClient ACL authorizer] Received 
event: WatchedEvent state:Disconnected type:None path:null 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,019] WARN SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper 
server without SASL authentication, if Zookeeper server allows it. 
(org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,019] INFO Opening socket connection to server 
===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,019] DEBUG [ZooKeeperClient Kafka server] Received event: 
WatchedEvent state:AuthFailed type:None path:null 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,019] ERROR [ZooKeeperClient Kafka server] Auth failed. 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,022] INFO Socket connection established, initiating 
session, client: /===BROKER_2_IP===:37734, server: ===ZOOKEEPER_HOST_2===:2181 
(org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,023] INFO Unable to read additional data from server 
sessionid 0x10028c111930125, likely server has closed socket, closing socket 
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,123] DEBUG [ZooKeeperClient Kafka server] Received event: 
WatchedEvent state:Disconnected type:None path:null 
(kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,124] INFO 
[ZooKeeperClient Kafka server] Waiting until connected. 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,125] INFO [ZooKeeperClient Kafka server] Waiting until 
connected. (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,586] INFO 
[ZooKeeperClient ACL authorizer] Reinitializing due to auth failure. 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,704] WARN SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper 
server without SASL authentication, if Zookeeper server allows it. 
(org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,704] INFO Opening socket connection to server 
===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,705] DEBUG [ZooKeeperClient ACL authorizer] Received 
event: WatchedEvent state:AuthFailed type:None path:null 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,705] ERROR [ZooKeeperClient ACL authorizer] Auth failed. 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,707] INFO Socket connection established, initiating 
session, client: /===BROKER_2_IP===:37738, server: ===ZOOKEEPER_HOST_2===:2181 
(org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,709] INFO Session establishment complete on server 
===ZOOKEEPER_HOST_2===:2181, sessionid = 0x10028c111930126, negotiated timeout 
= 18000 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,710] DEBUG [ZooKeeperClient ACL authorizer] Received 
event: WatchedEvent state:SyncConnected type:None path:null 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,798] WARN SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper 
server without SASL authentication, if Zookeeper server allows it. 
(org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,798] INFO Opening socket connection to server 
===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,798] DEBUG [ZooKeeperClient Kafka server] Received event: 
WatchedEvent state:AuthFailed type:None path:null 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,798] ERROR [ZooKeeperClient Kafka server] Auth failed. 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,807] INFO Socket connection established, initiating 
session, client: /===BROKER_2_IP===:48568, server: ===ZOOKEEPER_HOST_3===:2181 
(org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,824] INFO Session establishment complete on server 
===ZOOKEEPER_HOST_3===:2181, sessionid = 0x10028c111930125, negotiated timeout 
= 18000 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,824] DEBUG [ZooKeeperClient Kafka server] Received event: 
WatchedEvent state:SyncConnected type:None path:null 
(kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,824] INFO 
[ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,824] INFO [ZooKeeperClient Kafka server] Connected. 
(kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:45,020] INFO 
[ZooKeeperClient Kafka server] Reinitializing due to auth failure. 
(kafka.zookeeper.ZooKeeperClient) {code}
The main points here are next:
1. The log above is from kafka controller node.
2. Connection to ZK was lost _"Received event: WatchedEvent state:Disconnected"_
3. Connection was established with live ZK nodes _"Opening socket connection to 
server ===ZOOKEEPER_HOST_3===:2181"_ and _"Received event: WatchedEvent 
state:SyncConnected"_
4. During connection process we get _"Auth failed."_ error as it was during 
starting of brokers.
5. The error "Auth failed" is catched in the code: 
*kafka.zookeeper.ZooKeeperClient.ZooKeeperClientWatcher.process* and call code:
{code:java}
if (initialized)
    scheduleReinitialize("auth-failed", "Reinitializing due to auth failure.", 
RetryBackoffMs) {code}
where RetryBackoffMs is 1000.

In a second after "Auth failed" message we see message "{_}Reinitializing due 
to auth failure.{_}" in the log.
The method *ZooKeeperClient.scheduleReinitialize* calls 
{*}ZooKeeperClient.reinitialize{*}.

 
{code:java}
  private def reinitialize(): Unit = {
    // Initialization callbacks are invoked outside of the lock to avoid 
deadlock potential since their completion
    // may require additional Zookeeper requests, which will block to acquire 
the initialization lock
    stateChangeHandlers.values.foreach(callBeforeInitializingSession _)    
inWriteLock(initializationLock) {
      if (!connectionState.isAlive) {
        zooKeeper.close()
        info(s"Initializing a new session to $connectString.")
        // retry forever until ZooKeeper can be instantiated
        var connected = false
        while (!connected) {
          try {
            zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, 
ZooKeeperClientWatcher, clientConfig)
            connected = true
          } catch {
            case e: Exception =>
              info("Error when recreating ZooKeeper, retrying after a short 
sleep", e)
              Thread.sleep(RetryBackoffMs)
          }
        }
      }
    }    stateChangeHandlers.values.foreach(callAfterInitializingSession _)
  } {code}
The code inside "{_}inWriteLock(initializationLock){_}" block is not running, 
because the connection already was establised with live ZK nodes (in logs I 
didn't find the message s"{_}Initializing a new session to $connectString.{_}").
But the code "{*}callBeforeInitializingSession{*}" calls 
"{*}KafkaController.startup.beforeInitializingSession{*}" which fires 
"{*}ControllerEvent.Expire{*}" event and the code 
"{*}callAfterInitializingSession"{*} calls 
"{*}KafkaController.startup.afterInitializingSession{*}" and fires 
"{*}ControllerEvent.RegisterBrokerAndReelect{*}" event.

The event "{*}ControllerEvent.Expire{*}" call 
"{*}KafkaController.processExpire"{*} method which shutting down the current 
controller and in logs we see next:
{code:java}
[2021-12-16 14:08:45,022] DEBUG [Controller id=2] Resigning 
(kafka.controller.KafkaController)
[2021-12-16 14:08:45,022] DEBUG [Controller id=2] Unregister 
BrokerModifications handler for Set(1, 2, 3) (kafka.controller.KafkaController)
[2021-12-16 14:08:45,023] INFO [PartitionStateMachine controllerId=2] Stopped 
partition state machine (kafka.controller.ZkPartitionStateMachine)
[2021-12-16 14:08:45,025] INFO [ReplicaStateMachine controllerId=2] Stopped 
replica state machine (kafka.controller.ZkReplicaStateMachine)
[2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutting down 
(kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Stopped 
(kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutdown 
completed (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,031] INFO [RequestSendThread controllerId=2] Shutting down 
(kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,032] INFO [RequestSendThread controllerId=2] Stopped 
(kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,033] INFO [RequestSendThread controllerId=2] Shutdown 
completed (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutting down 
(kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Stopped 
(kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutdown 
completed (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,040] INFO [Controller id=2] Resigned 
(kafka.controller.KafkaController) {code}
And then "{*}ControllerEvent.RegisterBrokerAndReelect{*}" event calls method 
"{*}KafkaController.processRegisterBrokerAndReelect{*}" => 
"{*}KafkaController.elect{*}". But in the logs there is next:

 
{code:java}
[2021-12-16 14:08:45,129] DEBUG [Controller id=2] Broker 2 has been elected as 
the controller, so stopping the election process. 
(kafka.controller.KafkaController) {code}
There is a part of the method *"KafkaController.elect"* which write the message 
above

 

 
{code:java}
  private def elect(): Unit = {
    activeControllerId = zkClient.getControllerId.getOrElse(-1)
    /*
     * We can get here during the initial startup and the handleDeleted ZK 
callback. Because of the potential race condition,
     * it's possible that the controller has already been elected when we get 
here. This check will prevent the following
     * createEphemeralPath method from getting into an infinite loop if this 
broker is already the controller.
     */
    if (activeControllerId != -1) {
      debug(s"Broker $activeControllerId has been elected as the controller, so 
stopping the election process.")
      return
    } {code}
The problem is:
The current controller was shutting down because of 
"{*}KafkaController.processExpire{*}" event but a new one wasn't elected 
because ZK didn't clean /controller node.

So in logs we see that Controller id=2 was resigned and then stopped election 
because think that it is still a live controller, but all listeners of ZK 
already shutted down.

If after that I restart any non controller brokers (or they were restarted by 
any reason) then they don't get metadata because there is no the controller in 
cluster.
And if we try to consume data from this brokers we get next errors:
{code:java}
/opt/kafka/bin/kafka-console-consumer.sh --topic test_topic --from-beginning 
--bootstrap-server  $(hostname):9091 --consumer.config ~/connect.properties 

WARN [Consumer clientId=consumer-console-consumer-57293-1, 
groupId=console-consumer-57293] Error while fetching metadata with correlation 
id 2 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} 
(org.apache.kafka.clients.NetworkClient)
WARN [Consumer clientId=consumer-console-consumer-57293-1, 
groupId=console-consumer-57293] Error while fetching metadata with correlation 
id 3 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} 
(org.apache.kafka.clients.NetworkClient){code}
UPD: Workaround for this bug:
 # Setup proper SASL connection for ZK
 # Disable using SASL connection via property "-Dzookeeper.sasl.client=false"

 

  was:
Hi team,

*Kafka version:* 2.8.1
*Configuration:* 3 kafka brokers in different availability zones and 3 
zookeeper brokers in different availability zones.

I faced with a bug when kafka cluster loses the controller and if after that 
restart any none controller broker then it stops processing data.

*Context:*
The kafka cluster has SASL configuration for connection.
{code:java}
# Listeners config
listeners=SASL_SSL://[::]:9091,SASL_PLAINTEXT://[::]:9092
advertised.listeners=SASL_SSL://---DOMAIN---:9091,SASL_PLAINTEXT://---DOMAIN---:9092sasl.enabled.mechanisms=SCRAM-SHA-512
inter.broker.listener.name=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
inter.broker.protocol.version=2.8-IV0
# Keystore config
ssl.keystore.location=/etc/kafka/ssl/server.keystore.jks
ssl.keystore.password=---PASSWORD---
ssl.key.password=---PASSWORD---
ssl.truststore.location=/etc/kafka/ssl/server.truststore.jks
ssl.truststore.password=---PASSWORD--- 

zookeeper.connect=---ZK_HOST_1---:2181,---ZK_HOST_2---:2181,---ZK_HOST_3---:2181
zookeeper.connection.timeout.ms=6000 {code}
Zookeeper doesn't have SASL configuration and uses connection without any 
authentication.

So, when I start kafka brokers I see in logs error about auth failed but then 
ZK client switches to connection without authentication and all works fine.
{code:java}
[2021-12-16 14:04:08,451] INFO [ZooKeeperClient Kafka server] Waiting until 
connected. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:04:08,462] WARN SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper 
server without SASL authentication, if Zookeeper server allows it. 
(org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:04:08,466] DEBUG [ZooKeeperClient Kafka server] Received event: 
WatchedEvent state:AuthFailed type:None path:null 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:04:08,467] ERROR [ZooKeeperClient Kafka server] Auth failed. 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:04:08,470] INFO Opening socket connection to server 
===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:04:08,478] INFO Socket connection established, initiating 
session, client: /===BROKER_2_IP===:48022, server: ===ZOOKEEPER_HOST_1===:2181 
(org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:04:08,496] INFO Session establishment complete on server 
===ZOOKEEPER_HOST_1===:2181, sessionid = 0x10028c111930125, negotiated timeout 
= 18000 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:04:08,497] DEBUG [ZooKeeperClient Kafka server] Received event: 
WatchedEvent state:SyncConnected type:None path:null 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:04:08,498] INFO [ZooKeeperClient Kafka server] Connected. 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:04:08,753] INFO [feature-zk-node-event-process-thread]: Starting 
(kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread) 
{code}
Then I restart ZK node (or it restarts somehow) which holds connection with 
kafka controller node (===ZOOKEEPER_HOST_1===) and in logs I see next:
{code:java}
[2021-12-16 14:08:42,951] INFO Unable to read additional data from server 
sessionid 0x10028c111930126, likely server has closed socket, closing socket 
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:42,951] INFO Unable to read additional data from server 
sessionid 0x10028c111930125, likely server has closed socket, closing socket 
connection and attempting reconnect 
(org.apache.zookeeper.ClientCnxn)[2021-12-16 14:08:43,052] DEBUG 
[ZooKeeperClient ACL authorizer] Received event: WatchedEvent 
state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:43,053] DEBUG [ZooKeeperClient Kafka server] Received event: 
WatchedEvent state:Disconnected type:None path:null 
(kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:43,583] WARN SASL 
configuration failed: javax.security.auth.login.LoginException: No JAAS 
configuration section named 'Client' was found in specified JAAS configuration 
file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to 
Zookeeper server without SASL authentication, if Zookeeper server allows it. 
(org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:43,583] INFO Opening socket connection to server 
===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:43,583] DEBUG [ZooKeeperClient ACL authorizer] Received 
event: WatchedEvent state:AuthFailed type:None path:null 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:43,584] ERROR [ZooKeeperClient ACL authorizer] Auth failed. 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:43,593] INFO Socket connection established, initiating 
session, client: /===BROKER_2_IP===:48560, server: ===ZOOKEEPER_HOST_3===:2181 
(org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:43,808] INFO Unable to read additional data from server 
sessionid 0x10028c111930126, likely server has closed socket, closing socket 
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:43,910] DEBUG [ZooKeeperClient ACL authorizer] Received 
event: WatchedEvent state:Disconnected type:None path:null 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,019] WARN SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper 
server without SASL authentication, if Zookeeper server allows it. 
(org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,019] INFO Opening socket connection to server 
===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,019] DEBUG [ZooKeeperClient Kafka server] Received event: 
WatchedEvent state:AuthFailed type:None path:null 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,019] ERROR [ZooKeeperClient Kafka server] Auth failed. 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,022] INFO Socket connection established, initiating 
session, client: /===BROKER_2_IP===:37734, server: ===ZOOKEEPER_HOST_2===:2181 
(org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,023] INFO Unable to read additional data from server 
sessionid 0x10028c111930125, likely server has closed socket, closing socket 
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,123] DEBUG [ZooKeeperClient Kafka server] Received event: 
WatchedEvent state:Disconnected type:None path:null 
(kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,124] INFO 
[ZooKeeperClient Kafka server] Waiting until connected. 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,125] INFO [ZooKeeperClient Kafka server] Waiting until 
connected. (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,586] INFO 
[ZooKeeperClient ACL authorizer] Reinitializing due to auth failure. 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,704] WARN SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper 
server without SASL authentication, if Zookeeper server allows it. 
(org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,704] INFO Opening socket connection to server 
===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,705] DEBUG [ZooKeeperClient ACL authorizer] Received 
event: WatchedEvent state:AuthFailed type:None path:null 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,705] ERROR [ZooKeeperClient ACL authorizer] Auth failed. 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,707] INFO Socket connection established, initiating 
session, client: /===BROKER_2_IP===:37738, server: ===ZOOKEEPER_HOST_2===:2181 
(org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,709] INFO Session establishment complete on server 
===ZOOKEEPER_HOST_2===:2181, sessionid = 0x10028c111930126, negotiated timeout 
= 18000 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,710] DEBUG [ZooKeeperClient ACL authorizer] Received 
event: WatchedEvent state:SyncConnected type:None path:null 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,798] WARN SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper 
server without SASL authentication, if Zookeeper server allows it. 
(org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,798] INFO Opening socket connection to server 
===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,798] DEBUG [ZooKeeperClient Kafka server] Received event: 
WatchedEvent state:AuthFailed type:None path:null 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,798] ERROR [ZooKeeperClient Kafka server] Auth failed. 
(kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,807] INFO Socket connection established, initiating 
session, client: /===BROKER_2_IP===:48568, server: ===ZOOKEEPER_HOST_3===:2181 
(org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,824] INFO Session establishment complete on server 
===ZOOKEEPER_HOST_3===:2181, sessionid = 0x10028c111930125, negotiated timeout 
= 18000 (org.apache.zookeeper.ClientCnxn)
[2021-12-16 14:08:44,824] DEBUG [ZooKeeperClient Kafka server] Received event: 
WatchedEvent state:SyncConnected type:None path:null 
(kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,824] INFO 
[ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
[2021-12-16 14:08:44,824] INFO [ZooKeeperClient Kafka server] Connected. 
(kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:45,020] INFO 
[ZooKeeperClient Kafka server] Reinitializing due to auth failure. 
(kafka.zookeeper.ZooKeeperClient) {code}
The main points here are next:
1. The log above is from kafka controller node.
2. Connection to ZK was lost _"Received event: WatchedEvent state:Disconnected"_
3. Connection was established with live ZK nodes _"Opening socket connection to 
server ===ZOOKEEPER_HOST_3===:2181"_ and _"Received event: WatchedEvent 
state:SyncConnected"_
4. During connection process we get _"Auth failed."_ error as it was during 
starting of brokers.
5. The error "Auth failed" is catched in the code: 
*kafka.zookeeper.ZooKeeperClient.ZooKeeperClientWatcher.process* and call code:
{code:java}
if (initialized)
    scheduleReinitialize("auth-failed", "Reinitializing due to auth failure.", 
RetryBackoffMs) {code}
where RetryBackoffMs is 1000.

In a second after "Auth failed" message we see message "{_}Reinitializing due 
to auth failure.{_}" in the log.
The method *ZooKeeperClient.scheduleReinitialize* calls 
{*}ZooKeeperClient.reinitialize{*}.

 
{code:java}
  private def reinitialize(): Unit = {
    // Initialization callbacks are invoked outside of the lock to avoid 
deadlock potential since their completion
    // may require additional Zookeeper requests, which will block to acquire 
the initialization lock
    stateChangeHandlers.values.foreach(callBeforeInitializingSession _)    
inWriteLock(initializationLock) {
      if (!connectionState.isAlive) {
        zooKeeper.close()
        info(s"Initializing a new session to $connectString.")
        // retry forever until ZooKeeper can be instantiated
        var connected = false
        while (!connected) {
          try {
            zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, 
ZooKeeperClientWatcher, clientConfig)
            connected = true
          } catch {
            case e: Exception =>
              info("Error when recreating ZooKeeper, retrying after a short 
sleep", e)
              Thread.sleep(RetryBackoffMs)
          }
        }
      }
    }    stateChangeHandlers.values.foreach(callAfterInitializingSession _)
  } {code}
The code inside "{_}inWriteLock(initializationLock){_}" block is not running, 
because the connection already was establised with live ZK nodes (in logs I 
didn't find the message s"{_}Initializing a new session to $connectString.{_}").
But the code "{*}callBeforeInitializingSession{*}" calls 
"{*}KafkaController.startup.beforeInitializingSession{*}" which fires 
"{*}ControllerEvent.Expire{*}" event and the code 
"{*}callAfterInitializingSession"{*} calls 
"{*}KafkaController.startup.afterInitializingSession{*}" and fires 
"{*}ControllerEvent.RegisterBrokerAndReelect{*}" event.

The event "{*}ControllerEvent.Expire{*}" call 
"{*}KafkaController.processExpire"{*} method which shutting down the current 
controller and in logs we see next:
{code:java}
[2021-12-16 14:08:45,022] DEBUG [Controller id=2] Resigning 
(kafka.controller.KafkaController)
[2021-12-16 14:08:45,022] DEBUG [Controller id=2] Unregister 
BrokerModifications handler for Set(1, 2, 3) (kafka.controller.KafkaController)
[2021-12-16 14:08:45,023] INFO [PartitionStateMachine controllerId=2] Stopped 
partition state machine (kafka.controller.ZkPartitionStateMachine)
[2021-12-16 14:08:45,025] INFO [ReplicaStateMachine controllerId=2] Stopped 
replica state machine (kafka.controller.ZkReplicaStateMachine)
[2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutting down 
(kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Stopped 
(kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutdown 
completed (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,031] INFO [RequestSendThread controllerId=2] Shutting down 
(kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,032] INFO [RequestSendThread controllerId=2] Stopped 
(kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,033] INFO [RequestSendThread controllerId=2] Shutdown 
completed (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutting down 
(kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Stopped 
(kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutdown 
completed (kafka.controller.RequestSendThread)
[2021-12-16 14:08:45,040] INFO [Controller id=2] Resigned 
(kafka.controller.KafkaController) {code}
And then "{*}ControllerEvent.RegisterBrokerAndReelect{*}" event calls method 
"{*}KafkaController.processRegisterBrokerAndReelect{*}" => 
"{*}KafkaController.elect{*}". But in the logs there is next:

 
{code:java}
[2021-12-16 14:08:45,129] DEBUG [Controller id=2] Broker 2 has been elected as 
the controller, so stopping the election process. 
(kafka.controller.KafkaController) {code}
There is a part of the method *"KafkaController.elect"* which write the message 
above

 

 
{code:java}
  private def elect(): Unit = {
    activeControllerId = zkClient.getControllerId.getOrElse(-1)
    /*
     * We can get here during the initial startup and the handleDeleted ZK 
callback. Because of the potential race condition,
     * it's possible that the controller has already been elected when we get 
here. This check will prevent the following
     * createEphemeralPath method from getting into an infinite loop if this 
broker is already the controller.
     */
    if (activeControllerId != -1) {
      debug(s"Broker $activeControllerId has been elected as the controller, so 
stopping the election process.")
      return
    } {code}
The problem is:
The current controller was shutting down because of 
"{*}KafkaController.processExpire{*}" event but a new one wasn't elected 
because ZK didn't clean /controller node.

So in logs we see that Controller id=2 was resigned and then stopped election 
because think that it is still a live controller, but all listeners of ZK 
already shutted down.

If after that I restart any non controller brokers (or they were restarted by 
any reason) then they don't get metadata because there is no the controller in 
cluster.
And if we try to consume data from this brokers we get next errors:
{code:java}
/opt/kafka/bin/kafka-console-consumer.sh --topic test_topic --from-beginning 
--bootstrap-server  $(hostname):9091 --consumer.config ~/connect.properties 

WARN [Consumer clientId=consumer-console-consumer-57293-1, 
groupId=console-consumer-57293] Error while fetching metadata with correlation 
id 2 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} 
(org.apache.kafka.clients.NetworkClient)
WARN [Consumer clientId=consumer-console-consumer-57293-1, 
groupId=console-consumer-57293] Error while fetching metadata with correlation 
id 3 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} 
(org.apache.kafka.clients.NetworkClient){code}
UPD: Workaround for this bug:
 # Setup sasl connection for ZK
 # Disable using sasl connection via property "-Dzookeeper.sasl.client=false"

 


> Kafka cluster loses the controller
> ----------------------------------
>
>                 Key: KAFKA-13556
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13556
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 2.8.1
>            Reporter: Andrei Lakhmanets
>            Priority: Major
>
> Hi team,
> *Kafka version:* 2.8.1
> *Configuration:* 3 kafka brokers in different availability zones and 3 
> zookeeper brokers in different availability zones.
> I faced with a bug when kafka cluster loses the controller and if after that 
> restart any none controller broker then it stops processing data.
> *Context:*
> The kafka cluster has SASL configuration for connection.
> {code:java}
> # Listeners config
> listeners=SASL_SSL://[::]:9091,SASL_PLAINTEXT://[::]:9092
> advertised.listeners=SASL_SSL://---DOMAIN---:9091,SASL_PLAINTEXT://---DOMAIN---:9092sasl.enabled.mechanisms=SCRAM-SHA-512
> inter.broker.listener.name=SASL_SSL
> sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
> inter.broker.protocol.version=2.8-IV0
> # Keystore config
> ssl.keystore.location=/etc/kafka/ssl/server.keystore.jks
> ssl.keystore.password=---PASSWORD---
> ssl.key.password=---PASSWORD---
> ssl.truststore.location=/etc/kafka/ssl/server.truststore.jks
> ssl.truststore.password=---PASSWORD--- 
> zookeeper.connect=---ZK_HOST_1---:2181,---ZK_HOST_2---:2181,---ZK_HOST_3---:2181
> zookeeper.connection.timeout.ms=6000 {code}
> Zookeeper doesn't have SASL configuration and uses connection without any 
> authentication.
> So, when I start kafka brokers I see in logs error about auth failed but then 
> ZK client switches to connection without authentication and all works fine.
> {code:java}
> [2021-12-16 14:04:08,451] INFO [ZooKeeperClient Kafka server] Waiting until 
> connected. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:04:08,462] WARN SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it. 
> (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:04:08,466] DEBUG [ZooKeeperClient Kafka server] Received 
> event: WatchedEvent state:AuthFailed type:None path:null 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:04:08,467] ERROR [ZooKeeperClient Kafka server] Auth failed. 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:04:08,470] INFO Opening socket connection to server 
> ===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:04:08,478] INFO Socket connection established, initiating 
> session, client: /===BROKER_2_IP===:48022, server: 
> ===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:04:08,496] INFO Session establishment complete on server 
> ===ZOOKEEPER_HOST_1===:2181, sessionid = 0x10028c111930125, negotiated 
> timeout = 18000 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:04:08,497] DEBUG [ZooKeeperClient Kafka server] Received 
> event: WatchedEvent state:SyncConnected type:None path:null 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:04:08,498] INFO [ZooKeeperClient Kafka server] Connected. 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:04:08,753] INFO [feature-zk-node-event-process-thread]: 
> Starting 
> (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread)
>  {code}
> Then I restart ZK node (or it restarts somehow) which holds connection with 
> kafka controller node (===ZOOKEEPER_HOST_1===) and in logs I see next:
> {code:java}
> [2021-12-16 14:08:42,951] INFO Unable to read additional data from server 
> sessionid 0x10028c111930126, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:42,951] INFO Unable to read additional data from server 
> sessionid 0x10028c111930125, likely server has closed socket, closing socket 
> connection and attempting reconnect 
> (org.apache.zookeeper.ClientCnxn)[2021-12-16 14:08:43,052] DEBUG 
> [ZooKeeperClient ACL authorizer] Received event: WatchedEvent 
> state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:43,053] DEBUG [ZooKeeperClient Kafka server] Received 
> event: WatchedEvent state:Disconnected type:None path:null 
> (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:43,583] WARN SASL 
> configuration failed: javax.security.auth.login.LoginException: No JAAS 
> configuration section named 'Client' was found in specified JAAS 
> configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue 
> connection to Zookeeper server without SASL authentication, if Zookeeper 
> server allows it. (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:43,583] INFO Opening socket connection to server 
> ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:43,583] DEBUG [ZooKeeperClient ACL authorizer] Received 
> event: WatchedEvent state:AuthFailed type:None path:null 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:43,584] ERROR [ZooKeeperClient ACL authorizer] Auth failed. 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:43,593] INFO Socket connection established, initiating 
> session, client: /===BROKER_2_IP===:48560, server: 
> ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:43,808] INFO Unable to read additional data from server 
> sessionid 0x10028c111930126, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:43,910] DEBUG [ZooKeeperClient ACL authorizer] Received 
> event: WatchedEvent state:Disconnected type:None path:null 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,019] WARN SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it. 
> (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,019] INFO Opening socket connection to server 
> ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,019] DEBUG [ZooKeeperClient Kafka server] Received 
> event: WatchedEvent state:AuthFailed type:None path:null 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,019] ERROR [ZooKeeperClient Kafka server] Auth failed. 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,022] INFO Socket connection established, initiating 
> session, client: /===BROKER_2_IP===:37734, server: 
> ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,023] INFO Unable to read additional data from server 
> sessionid 0x10028c111930125, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,123] DEBUG [ZooKeeperClient Kafka server] Received 
> event: WatchedEvent state:Disconnected type:None path:null 
> (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,124] INFO 
> [ZooKeeperClient Kafka server] Waiting until connected. 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,125] INFO [ZooKeeperClient Kafka server] Waiting until 
> connected. (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,586] INFO 
> [ZooKeeperClient ACL authorizer] Reinitializing due to auth failure. 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,704] WARN SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it. 
> (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,704] INFO Opening socket connection to server 
> ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,705] DEBUG [ZooKeeperClient ACL authorizer] Received 
> event: WatchedEvent state:AuthFailed type:None path:null 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,705] ERROR [ZooKeeperClient ACL authorizer] Auth failed. 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,707] INFO Socket connection established, initiating 
> session, client: /===BROKER_2_IP===:37738, server: 
> ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,709] INFO Session establishment complete on server 
> ===ZOOKEEPER_HOST_2===:2181, sessionid = 0x10028c111930126, negotiated 
> timeout = 18000 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,710] DEBUG [ZooKeeperClient ACL authorizer] Received 
> event: WatchedEvent state:SyncConnected type:None path:null 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,798] WARN SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it. 
> (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,798] INFO Opening socket connection to server 
> ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,798] DEBUG [ZooKeeperClient Kafka server] Received 
> event: WatchedEvent state:AuthFailed type:None path:null 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,798] ERROR [ZooKeeperClient Kafka server] Auth failed. 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,807] INFO Socket connection established, initiating 
> session, client: /===BROKER_2_IP===:48568, server: 
> ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,824] INFO Session establishment complete on server 
> ===ZOOKEEPER_HOST_3===:2181, sessionid = 0x10028c111930125, negotiated 
> timeout = 18000 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,824] DEBUG [ZooKeeperClient Kafka server] Received 
> event: WatchedEvent state:SyncConnected type:None path:null 
> (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,824] INFO 
> [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,824] INFO [ZooKeeperClient Kafka server] Connected. 
> (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:45,020] INFO 
> [ZooKeeperClient Kafka server] Reinitializing due to auth failure. 
> (kafka.zookeeper.ZooKeeperClient) {code}
> The main points here are next:
> 1. The log above is from kafka controller node.
> 2. Connection to ZK was lost _"Received event: WatchedEvent 
> state:Disconnected"_
> 3. Connection was established with live ZK nodes _"Opening socket connection 
> to server ===ZOOKEEPER_HOST_3===:2181"_ and _"Received event: WatchedEvent 
> state:SyncConnected"_
> 4. During connection process we get _"Auth failed."_ error as it was during 
> starting of brokers.
> 5. The error "Auth failed" is catched in the code: 
> *kafka.zookeeper.ZooKeeperClient.ZooKeeperClientWatcher.process* and call 
> code:
> {code:java}
> if (initialized)
>     scheduleReinitialize("auth-failed", "Reinitializing due to auth 
> failure.", RetryBackoffMs) {code}
> where RetryBackoffMs is 1000.
> In a second after "Auth failed" message we see message "{_}Reinitializing due 
> to auth failure.{_}" in the log.
> The method *ZooKeeperClient.scheduleReinitialize* calls 
> {*}ZooKeeperClient.reinitialize{*}.
>  
> {code:java}
>   private def reinitialize(): Unit = {
>     // Initialization callbacks are invoked outside of the lock to avoid 
> deadlock potential since their completion
>     // may require additional Zookeeper requests, which will block to acquire 
> the initialization lock
>     stateChangeHandlers.values.foreach(callBeforeInitializingSession _)    
> inWriteLock(initializationLock) {
>       if (!connectionState.isAlive) {
>         zooKeeper.close()
>         info(s"Initializing a new session to $connectString.")
>         // retry forever until ZooKeeper can be instantiated
>         var connected = false
>         while (!connected) {
>           try {
>             zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, 
> ZooKeeperClientWatcher, clientConfig)
>             connected = true
>           } catch {
>             case e: Exception =>
>               info("Error when recreating ZooKeeper, retrying after a short 
> sleep", e)
>               Thread.sleep(RetryBackoffMs)
>           }
>         }
>       }
>     }    stateChangeHandlers.values.foreach(callAfterInitializingSession _)
>   } {code}
> The code inside "{_}inWriteLock(initializationLock){_}" block is not running, 
> because the connection already was establised with live ZK nodes (in logs I 
> didn't find the message s"{_}Initializing a new session to 
> $connectString.{_}").
> But the code "{*}callBeforeInitializingSession{*}" calls 
> "{*}KafkaController.startup.beforeInitializingSession{*}" which fires 
> "{*}ControllerEvent.Expire{*}" event and the code 
> "{*}callAfterInitializingSession"{*} calls 
> "{*}KafkaController.startup.afterInitializingSession{*}" and fires 
> "{*}ControllerEvent.RegisterBrokerAndReelect{*}" event.
> The event "{*}ControllerEvent.Expire{*}" call 
> "{*}KafkaController.processExpire"{*} method which shutting down the current 
> controller and in logs we see next:
> {code:java}
> [2021-12-16 14:08:45,022] DEBUG [Controller id=2] Resigning 
> (kafka.controller.KafkaController)
> [2021-12-16 14:08:45,022] DEBUG [Controller id=2] Unregister 
> BrokerModifications handler for Set(1, 2, 3) 
> (kafka.controller.KafkaController)
> [2021-12-16 14:08:45,023] INFO [PartitionStateMachine controllerId=2] Stopped 
> partition state machine (kafka.controller.ZkPartitionStateMachine)
> [2021-12-16 14:08:45,025] INFO [ReplicaStateMachine controllerId=2] Stopped 
> replica state machine (kafka.controller.ZkReplicaStateMachine)
> [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutting 
> down (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Stopped 
> (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutdown 
> completed (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,031] INFO [RequestSendThread controllerId=2] Shutting 
> down (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,032] INFO [RequestSendThread controllerId=2] Stopped 
> (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,033] INFO [RequestSendThread controllerId=2] Shutdown 
> completed (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutting 
> down (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Stopped 
> (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutdown 
> completed (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,040] INFO [Controller id=2] Resigned 
> (kafka.controller.KafkaController) {code}
> And then "{*}ControllerEvent.RegisterBrokerAndReelect{*}" event calls method 
> "{*}KafkaController.processRegisterBrokerAndReelect{*}" => 
> "{*}KafkaController.elect{*}". But in the logs there is next:
>  
> {code:java}
> [2021-12-16 14:08:45,129] DEBUG [Controller id=2] Broker 2 has been elected 
> as the controller, so stopping the election process. 
> (kafka.controller.KafkaController) {code}
> There is a part of the method *"KafkaController.elect"* which write the 
> message above
>  
>  
> {code:java}
>   private def elect(): Unit = {
>     activeControllerId = zkClient.getControllerId.getOrElse(-1)
>     /*
>      * We can get here during the initial startup and the handleDeleted ZK 
> callback. Because of the potential race condition,
>      * it's possible that the controller has already been elected when we get 
> here. This check will prevent the following
>      * createEphemeralPath method from getting into an infinite loop if this 
> broker is already the controller.
>      */
>     if (activeControllerId != -1) {
>       debug(s"Broker $activeControllerId has been elected as the controller, 
> so stopping the election process.")
>       return
>     } {code}
> The problem is:
> The current controller was shutting down because of 
> "{*}KafkaController.processExpire{*}" event but a new one wasn't elected 
> because ZK didn't clean /controller node.
> So in logs we see that Controller id=2 was resigned and then stopped election 
> because think that it is still a live controller, but all listeners of ZK 
> already shutted down.
> If after that I restart any non controller brokers (or they were restarted by 
> any reason) then they don't get metadata because there is no the controller 
> in cluster.
> And if we try to consume data from this brokers we get next errors:
> {code:java}
> /opt/kafka/bin/kafka-console-consumer.sh --topic test_topic --from-beginning 
> --bootstrap-server  $(hostname):9091 --consumer.config ~/connect.properties 
> WARN [Consumer clientId=consumer-console-consumer-57293-1, 
> groupId=console-consumer-57293] Error while fetching metadata with 
> correlation id 2 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)
> WARN [Consumer clientId=consumer-console-consumer-57293-1, 
> groupId=console-consumer-57293] Error while fetching metadata with 
> correlation id 3 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient){code}
> UPD: Workaround for this bug:
>  # Setup proper SASL connection for ZK
>  # Disable using SASL connection via property "-Dzookeeper.sasl.client=false"
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to