Hi Community guys,

Greetings to you!

I'm setup kafka kraft cluster via containers use apache/kafka:3.9.0 docker 
image, and when enable sasl_plaintext for inter-broker and inter-controller 
communication, when I set up the cluster and configure ACLs, I set the user 
'kafka' (which used for inter-broker and inter-controller sasl authentication) 
as super user. Everything looks fine! The container cluster configuration 
persisted to host folders

# docker-compose file
============================================
services:
  kafka-1:
    image: "apache/kafka:3.9.0"
    hostname: kafka-1
    container_name: kafka-1
    environment:
      # Borker configuration
      KAFKA_NODE_ID: 1
      KAFKA_LOG_DIRS: '/var/lib/kafka/data'
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_CONTROLLER_QUORUM_VOTERS: 
'1@kafka-1:19092,2@kafka-2:19092,3@kafka-3:19092'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
'INTERNAL:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT'
      KAFKA_LISTENERS: 'CONTROLLER://:19092,INTERNAL://:9092'
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-1:9092"
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERNAL'
      # SASL
      KAFKA_LISTENER_NAME_INTERNAL_PLAIN_SASL_JAAS_CONFIG: 
'org.apache.kafka.common.security.plain.PlainLoginModule required 
username="kafka" password="kafka-secret" user_kafka="kafka-secret";'
      KAFKA_LISTENER_NAME_CONTROLLER_PLAIN_SASL_JAAS_CONFIG: 
'org.apache.kafka.common.security.plain.PlainLoginModule required 
username="kafka" password="kafka-secret" user_kafka="kafka-secret";'
      KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
      KAFKA_SASL_MECHANISM_CONTROLLER_PROTOCOL:  "PLAIN"
      # ACLs Configuration
      KAFKA_AUTHORIZER_CLASS_NAME: 
"org.apache.kafka.metadata.authorizer.StandardAuthorizer"
      KAFKA_SUPER_USERS: "User:kafka;User:kafka_admin"
      KAFKA_PRINCIPAL_BUILDER_CLASS: 
"org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder"
    volumes:
      # Mount the volume to persist the acls
      - "./kafka-1-data:/var/lib/kafka/data"
      # below for operation

  kafka-2:
    image: "apache/kafka:3.9.0"
    hostname: kafka-2
    container_name: kafka-2
    environment:
      # Borker configuration
      KAFKA_NODE_ID: 2
      KAFKA_LOG_DIRS: '/var/lib/kafka/data'    
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_CONTROLLER_QUORUM_VOTERS: 
'1@kafka-1:19092,2@kafka-2:19092,3@kafka-3:19092'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
'INTERNAL:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT'
      KAFKA_LISTENERS: 'CONTROLLER://:19092,INTERNAL://:9092'
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-2:9092"
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERNAL'
      # SASL
      KAFKA_LISTENER_NAME_INTERNAL_PLAIN_SASL_JAAS_CONFIG: 
'org.apache.kafka.common.security.plain.PlainLoginModule required 
username="kafka" password="kafka-secret" user_kafka="kafka-secret";'
      KAFKA_LISTENER_NAME_CONTROLLER_PLAIN_SASL_JAAS_CONFIG: 
'org.apache.kafka.common.security.plain.PlainLoginModule required 
username="kafka" password="kafka-secret" user_kafka="kafka-secret";'      
      KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
      KAFKA_SASL_MECHANISM_CONTROLLER_PROTOCOL:  "PLAIN"
      # ACLs Configuration
      KAFKA_AUTHORIZER_CLASS_NAME: 
"org.apache.kafka.metadata.authorizer.StandardAuthorizer"
      KAFKA_SUPER_USERS: "User:kafka;User:kafka_admin"
      KAFKA_PRINCIPAL_BUILDER_CLASS: 
"org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder"
    volumes:
      - "./kafka-2-data:/var/lib/kafka/data"

  kafka-3:
    image: "apache/kafka:3.9.0"
    hostname: kafka-3
    container_name: kafka-3
    environment:
      # Borker configuration
      KAFKA_NODE_ID: 3
      KAFKA_LOG_DIRS: '/var/lib/kafka/data'    
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_CONTROLLER_QUORUM_VOTERS: 
'1@kafka-1:19092,2@kafka-2:19092,3@kafka-3:19092'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
'INTERNAL:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT'
      KAFKA_LISTENERS: 'CONTROLLER://:19092,INTERNAL://:9092'
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-3:9092"
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERNAL'
      # SASL
      KAFKA_LISTENER_NAME_INTERNAL_PLAIN_SASL_JAAS_CONFIG: 
'org.apache.kafka.common.security.plain.PlainLoginModule required 
username="kafka" password="kafka-secret" user_kafka="kafka-secret";'
      KAFKA_LISTENER_NAME_CONTROLLER_PLAIN_SASL_JAAS_CONFIG: 
'org.apache.kafka.common.security.plain.PlainLoginModule required 
username="kafka" password="kafka-secret" user_kafka="kafka-secret";'      
      KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
      KAFKA_SASL_MECHANISM_CONTROLLER_PROTOCOL:  "PLAIN"
      # ACLs Configuration
      KAFKA_AUTHORIZER_CLASS_NAME: 
"org.apache.kafka.metadata.authorizer.StandardAuthorizer"
      KAFKA_SUPER_USERS: "User:kafka;User:kafka_admin"
      KAFKA_PRINCIPAL_BUILDER_CLASS: 
"org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder"
    volumes:
      - "./kafka-3-data:/var/lib/kafka/data"
============================================

However, after I complete the cluster setup, I removed the super user then 
destroy and recreate the containers, the cluster is not able to start and I got 
below errors:

============================================
[2024-12-23 08:17:09,909] ERROR [ControllerApis nodeId=1] Unexpected error 
handling request RequestHeader(apiKey=VOTE, apiVersion=1, 
clientId=raft-client-2, correlationId=1072, headerVersion=2) -- 
VoteRequestData(clusterId='5L6g3nShT-eMCtK--X86sw', voterId=1, 
topics=[TopicData(topicName='__cluster_metadata', 
partitions=[PartitionData(partitionIndex=0, candidateEpoch=17, candidateId=2, 
candidateDirectoryId=Tfznlo485OmgyP7bbT1g-Q, 
voterDirectoryId=AAAAAAAAAAAAAAAAAAAAAA, lastOffsetEpoch=12, 
lastOffset=3204)])]) with context 
RequestContext(header=RequestHeader(apiKey=VOTE, apiVersion=1, 
clientId=raft-client-2, correlationId=1072, headerVersion=2), 
connectionId='10.89.5.2:19092-10.89.5.4:35672-0', clientAddress=/10.89.5.4, 
principal=User:kafka, listenerName=ListenerName(CONTROLLER), 
securityProtocol=SASL_PLAINTEXT, 
clientInformation=ClientInformation(softwareName=apache-kafka-java, 
softwareVersion=3.9.0), fromPrivilegedListener=false, 
principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@151df65b])
 (kafka.server.ControllerApis)
org.apache.kafka.common.errors.AuthorizerNotReadyException
============================================

I  double checked the ACLs and believed it's fine, refer to below:

============================================
kafka-1:/$ $KAFKA_ACLS --bootstrap-server $BOOTSTRAP_SERVER \
    --command-config $ACL_CONFIG_FILE \
    --list
Current ACLs for resource `ResourcePattern(resourceType=TRANSACTIONAL_ID, 
name=*, patternType=LITERAL)`:
        (principal=User:kafka, host=*, operation=ALL, permissionType=ALLOW)
        (principal=User:kafka_admin, host=*, operation=ALL, 
permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, 
name=__cluster_metadata, patternType=LITERAL)`:
        (principal=User:kafka, host=*, operation=ALL, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=*, 
patternType=LITERAL)`:
        (principal=User:kafka, host=*, operation=ALL, permissionType=ALLOW)
        (principal=User:kafka_admin, host=*, operation=ALL, 
permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=*, 
patternType=LITERAL)`:
        (principal=User:kafka, host=*, operation=ALL, permissionType=ALLOW)
        (principal=User:kafka_admin, host=*, operation=ALL, 
permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=CLUSTER, 
name=kafka-cluster, patternType=LITERAL)`:
        (principal=User:kafka, host=*, operation=ALL, permissionType=ALLOW)
        (principal=User:kafka_admin, host=*, operation=ALL, 
permissionType=ALLOW)
============================================

According to 
KIP-801(https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195728007#KIP801:ImplementanAuthorizerthatstoresmetadatain__cluster_metadata-Bootstrapping),
 super user should only needed during the cluster first time initialization, so 
per my understanding once the ACLs been configured properly, the super user can 
be removed, but seems not

You should be able to reproduce this issue by comment the super user lines 
KAFKA_SUPER_USERS: "User:kafka;User:kafka_admin"

It will be great appreciated if you can help on this, thanks in advance!

Best Regards/
Han

Reply via email to