hachikuji commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r764232658



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1959,10 +1962,26 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
     }
   }
 
-  def listenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = {
-    getMap(KafkaConfig.ListenerSecurityProtocolMapProp, 
getString(KafkaConfig.ListenerSecurityProtocolMapProp))
+  def effectiveListenerSecurityProtocolMap: Map[ListenerName, 
SecurityProtocol] = {
+    val mapValue = getMap(KafkaConfig.ListenerSecurityProtocolMapProp, 
getString(KafkaConfig.ListenerSecurityProtocolMapProp))
       .map { case (listenerName, protocolName) =>
-      ListenerName.normalised(listenerName) -> 
getSecurityProtocol(protocolName, KafkaConfig.ListenerSecurityProtocolMapProp)
+        ListenerName.normalised(listenerName) -> 
getSecurityProtocol(protocolName, KafkaConfig.ListenerSecurityProtocolMapProp)
+      }
+    if (usesSelfManagedQuorum && 
!originals.containsKey(ListenerSecurityProtocolMapProp)) {
+      // Nothing was specified explicitly for listener.security.protocol.map, 
so we are using the default value,
+      // and we are using KRaft.
+      // Add PLAINTEXT mappings for controller listeners as long as there is 
no SSL or SASL_{PLAINTEXT,SSL} in use
+      def isSslOrSasl(name: String) : Boolean = 
name.equals(SecurityProtocol.SSL.name) || 
name.equals(SecurityProtocol.SASL_SSL.name) || 
name.equals(SecurityProtocol.SASL_PLAINTEXT.name)
+      if (controllerListenerNames.exists(isSslOrSasl) ||

Review comment:
       Checking my understanding. Is the first clause here necessary for the 
broker-only case in which the controller listener names are not included in 
`listeners`? A comment to that effect that might be useful.

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -2018,12 +2025,103 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
       " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
+    val advertisedListenerNames = 
effectiveAdvertisedListeners.map(_.listenerName).toSet
+
+    // validate KRaft-related configs
+    val voterAddressSpecsByNodeId = 
RaftConfig.parseVoterConnections(quorumVoters)
+    def validateNonEmptyQuorumVotersForKRaft(): Unit = {
+      if (voterAddressSpecsByNodeId.isEmpty) {
+        throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, 
${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
+      }
+    }
+    def validateControlPlaneListenerEmptyForKRaft(): Unit = {
+      require(controlPlaneListenerName.isEmpty,
+        s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in 
KRaft mode. KRaft uses ${KafkaConfig.ControllerListenerNamesProp} instead.")
+    }
+    val sourceOfAdvertisedListeners: String =
+      if (getString(KafkaConfig.AdvertisedListenersProp) != null)
+        s"${KafkaConfig.AdvertisedListenersProp}"
+      else
+        s"${KafkaConfig.ListenersProp}"
+    def 
validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker(): 
Unit = {
+      require(!advertisedListenerNames.exists(aln => 
controllerListenerNames.contains(aln.value())),
+        s"$sourceOfAdvertisedListeners must not contain KRaft controller 
listeners from ${KafkaConfig.ControllerListenerNamesProp} when 
${KafkaConfig.ProcessRolesProp} contains the broker role because Kafka clients 
that send requests via advertised listeners do not send requests to KRaft 
controllers -- they only send requests to KRaft brokers.")

Review comment:
       nit: The generality here seems misleading. We explicitly allow the 
controller listeners to be included among `listeners` even when 
`advertised.listeners` is empty. Hence I don't think it would ever be possible 
for `sourceOfAdvertisedListeners` to refer to `listeners` here. 

##########
File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
##########
@@ -1000,46 +1139,78 @@ class KafkaConfigTest {
     }
   }
 
-  def assertDistinctControllerAndAdvertisedListeners(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= TestUtils.MockZkPort)
-    val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
-    props.put(KafkaConfig.ListenersProp, listeners)
-    props.put(KafkaConfig.AdvertisedListenersProp, 
"PLAINTEXT://A:9092,SSL://B:9093")
+  @Test
+  def assertDistinctControllerAndAdvertisedListenersAllowedForKRaftBroker(): 
Unit = {
+    val props = new Properties()
+    props.put(KafkaConfig.ProcessRolesProp, "broker")
+    props.put(KafkaConfig.ListenersProp, 
"PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094")
+    props.put(KafkaConfig.AdvertisedListenersProp, 
"PLAINTEXT://A:9092,SSL://B:9093") // explicitly setting it in KRaft
+    props.put(KafkaConfig.ControllerListenerNamesProp, "SASL_SSL")
+    props.put(KafkaConfig.NodeIdProp, "2")
+    props.put(KafkaConfig.QuorumVotersProp, "3@localhost:9094")
+
+    // invalid due to extra listener also appearing in controller listeners
+    assertBadConfigContainingMessage(props,
+      "controller.listener.names must not contain a value appearing in the 
'listeners' configuration when running KRaft with just the broker role")
+
     // Valid now
-    assertTrue(isValidKafkaConfig(props))
+    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://A:9092,SSL://B:9093")
+    KafkaConfig.fromProps(props)
 
-    // Still valid
-    val controllerListeners = "SASL_SSL"
-    props.put(KafkaConfig.ControllerListenerNamesProp, controllerListeners)
-    assertTrue(isValidKafkaConfig(props))
+    // Also valid if we let advertised listeners be derived from 
listeners/controller.listener.names
+    // since listeners and advertised.listeners are explicitly identical at 
this point
+    props.remove(KafkaConfig.AdvertisedListenersProp)
+    KafkaConfig.fromProps(props)
   }
 
   @Test
-  def assertAllControllerListenerCannotBeAdvertised(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= TestUtils.MockZkPort)
+  def assertControllerListenersCannotBeAdvertisedForKRaftBroker(): Unit = {
+    val props = new Properties()
+    props.put(KafkaConfig.ProcessRolesProp, "broker,controller")
     val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
     props.put(KafkaConfig.ListenersProp, listeners)
-    props.put(KafkaConfig.AdvertisedListenersProp, listeners)
+    props.put(KafkaConfig.AdvertisedListenersProp, listeners) // explicitly 
setting it in KRaft
+    props.put(KafkaConfig.InterBrokerListenerNameProp, "SASL_SSL")
+    props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT,SSL")
+    props.put(KafkaConfig.NodeIdProp, "2")
+    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+    assertBadConfigContainingMessage(props,
+      "advertised.listeners must not contain KRaft controller listeners from 
controller.listener.names when process.roles contains the broker role")
+
     // Valid now
-    assertTrue(isValidKafkaConfig(props))
+    props.put(KafkaConfig.AdvertisedListenersProp, "SASL_SSL://C:9094")
+    KafkaConfig.fromProps(props)
 
-    // Invalid now
-    props.put(KafkaConfig.ControllerListenerNamesProp, 
"PLAINTEXT,SSL,SASL_SSL")
-    assertFalse(isValidKafkaConfig(props))
+    // Also valid if we allow advertised listeners to derive from 
listeners/controller.listener.names
+    props.remove(KafkaConfig.AdvertisedListenersProp)
+    KafkaConfig.fromProps(props)
   }
 
   @Test
-  def assertEvenOneControllerListenerCannotBeAdvertised(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= TestUtils.MockZkPort)
+  def assertAdvertisedListenersDisallowedForKRaftControllerOnlyRole(): Unit = {

Review comment:
       I might have missed it, but do we have a test for the scenario where 
SSL/SASL are in use and no controller listener security mapping is defined?

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -2011,12 +2012,68 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
       " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
-
     val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
+    if (usesSelfManagedQuorum) {
+      require(controlPlaneListenerName.isEmpty,
+        s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in 
KRaft mode.")
+      val sourceOfAdvertisedListeners = if 
(getString(KafkaConfig.AdvertisedListenersProp) != null)
+        s"${KafkaConfig.AdvertisedListenersProp}"
+      else
+        s"${KafkaConfig.ListenersProp}"
+      if (!processRoles.contains(BrokerRole)) {
+        // advertised listeners must be empty when not also running the broker 
role
+        require(advertisedListeners.isEmpty,
+          sourceOfAdvertisedListeners +
+            s" must only contain KRaft controller listeners from 
${KafkaConfig.ControllerListenerNamesProp} when 
${KafkaConfig.ProcessRolesProp}=controller")
+      } else {
+        // when running broker role advertised listeners cannot contain 
controller listeners
+        require(!advertisedListenerNames.exists(aln => 
controllerListenerNames.contains(aln.value())),
+          sourceOfAdvertisedListeners +
+            s" must not contain KRaft controller listeners from 
${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp} 
contains the broker role")
+      }
+      if (processRoles.contains(ControllerRole)) {
+        // has controller role (and optionally broker role as well)
+        // controller.listener.names must be non-empty
+        // every one must appear in listeners
+        // the port appearing in controller.quorum.voters for this node must 
match the port of the first controller listener
+        // (we allow other nodes' voter ports to differ to support running 
multiple controllers on the same host)
+        require(controllerListeners.nonEmpty,
+          s"${KafkaConfig.ControllerListenerNamesProp} must contain at least 
one value appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running the KRaft controller role")
+        val listenerNameValues = listeners.map(_.listenerName.value).toSet
+        require(controllerListenerNames.forall(cln => 
listenerNameValues.contains(cln)),
+          s"${KafkaConfig.ControllerListenerNamesProp} must only contain 
values appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running the KRaft controller role")
+        val addressSpecForThisNode = 
RaftConfig.parseVoterConnections(quorumVoters).get(nodeId)
+        addressSpecForThisNode match {
+          case inetAddressSpec: RaftConfig.InetAddressSpec => {
+            val quorumVotersPort = inetAddressSpec.address.getPort
+            require(controllerListeners.head.port == quorumVotersPort,
+              s"Port in ${KafkaConfig.QuorumVotersProp} for this controller 
node (${KafkaConfig.NodeIdProp}=$nodeId, port=$quorumVotersPort) does not match 
the port for the first controller listener in 
${KafkaConfig.ControllerListenerNamesProp} 
(${controllerListeners.head.listenerName.value()}, 
port=${controllerListeners.head.port})")
+          }
+          case _ =>
+        }
+      } else {
+        // only broker role
+        // controller.listener.names must be non-empty
+        // none of them can appear in listeners
+        // warn that only the first one is used if there is more than one
+        require(controllerListenerNames.exists(_.nonEmpty),
+          s"${KafkaConfig.ControllerListenerNamesProp} must contain at least 
one value when running KRaft with just the broker role")
+        if (controllerListenerNames.size > 1) {
+          warn(s"${KafkaConfig.ControllerListenerNamesProp} has multiple 
entries; only the first will be used since 
${KafkaConfig.ProcessRolesProp}=broker: $controllerListenerNames")
+        }
+        require(controllerListeners.isEmpty,
+          s"${KafkaConfig.ControllerListenerNamesProp} must not contain a 
value appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running KRaft with just the broker role")
+      }
+    } else {
+      // controller listener names must be empty when not in KRaft mode
+      require(!controllerListenerNames.exists(_.nonEmpty), 
s"${KafkaConfig.ControllerListenerNamesProp} must be empty when not running in 
KRaft mode: ${controllerListenerNames.asJava.toString}")

Review comment:
       Ok, at least the `toString` is redundant though.

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -748,7 +749,8 @@ object KafkaConfig {
     "Different security (SSL and SASL) settings can be configured for each 
listener by adding a normalised " +
     "prefix (the listener name is lowercased) to the config name. For example, 
to set a different keystore for the " +
     "INTERNAL listener, a config with name 
<code>listener.name.internal.ssl.keystore.location</code> would be set. " +
-    "If the config for the listener name is not set, the config will fallback 
to the generic config (i.e. <code>ssl.keystore.location</code>). "
+    "If the config for the listener name is not set, the config will fallback 
to the generic config (i.e. <code>ssl.keystore.location</code>). " +
+    "Note that in KRaft an additional default mapping CONTROLLER to PLAINTEXT 
is added."

Review comment:
       How about this?
   
   > Note that in KRaft, a default mapping from `controller.listener.names` to 
PLAINTEXT is assumed if no explicit mapping is provided and no other security 
protocol is in use.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
##########
@@ -1000,46 +1139,78 @@ class KafkaConfigTest {
     }
   }
 
-  def assertDistinctControllerAndAdvertisedListeners(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= TestUtils.MockZkPort)
-    val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
-    props.put(KafkaConfig.ListenersProp, listeners)
-    props.put(KafkaConfig.AdvertisedListenersProp, 
"PLAINTEXT://A:9092,SSL://B:9093")
+  @Test
+  def assertDistinctControllerAndAdvertisedListenersAllowedForKRaftBroker(): 
Unit = {

Review comment:
       nit: it would be nice to keep the convention of prefixing test cases 
with "test"

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -2018,12 +2025,103 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
       " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
+    val advertisedListenerNames = 
effectiveAdvertisedListeners.map(_.listenerName).toSet
+
+    // validate KRaft-related configs
+    val voterAddressSpecsByNodeId = 
RaftConfig.parseVoterConnections(quorumVoters)
+    def validateNonEmptyQuorumVotersForKRaft(): Unit = {
+      if (voterAddressSpecsByNodeId.isEmpty) {
+        throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, 
${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
+      }
+    }
+    def validateControlPlaneListenerEmptyForKRaft(): Unit = {
+      require(controlPlaneListenerName.isEmpty,
+        s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in 
KRaft mode. KRaft uses ${KafkaConfig.ControllerListenerNamesProp} instead.")
+    }
+    val sourceOfAdvertisedListeners: String =
+      if (getString(KafkaConfig.AdvertisedListenersProp) != null)
+        s"${KafkaConfig.AdvertisedListenersProp}"
+      else
+        s"${KafkaConfig.ListenersProp}"
+    def 
validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker(): 
Unit = {
+      require(!advertisedListenerNames.exists(aln => 
controllerListenerNames.contains(aln.value())),
+        s"$sourceOfAdvertisedListeners must not contain KRaft controller 
listeners from ${KafkaConfig.ControllerListenerNamesProp} when 
${KafkaConfig.ProcessRolesProp} contains the broker role because Kafka clients 
that send requests via advertised listeners do not send requests to KRaft 
controllers -- they only send requests to KRaft brokers.")
+    }
+    def validateControllerQuorumVotersMustContainNodeIDForKRaftController(): 
Unit = {
+      require(voterAddressSpecsByNodeId.containsKey(nodeId),
+        s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, 
the node id $nodeId must be included in the set of voters 
${KafkaConfig.QuorumVotersProp}=${voterAddressSpecsByNodeId.asScala.keySet.toSet}")
+    }
+    def validateControllerListenerExistsForKRaftController(): Unit = {
+      require(controllerListeners.nonEmpty,
+        s"${KafkaConfig.ControllerListenerNamesProp} must contain at least one 
value appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running the KRaft controller role")
+    }
+    def 
validateControllerListenerNamesMustAppearInListenersForKRaftController(): Unit 
= {
+      val listenerNameValues = listeners.map(_.listenerName.value).toSet
+      require(controllerListenerNames.forall(cln => 
listenerNameValues.contains(cln)),
+        s"${KafkaConfig.ControllerListenerNamesProp} must only contain values 
appearing in the '${KafkaConfig.ListenersProp}' configuration when running the 
KRaft controller role")
+    }
+    def validateAdvertisedListenersNonEmptyForBroker(): Unit = {
+      require(advertisedListenerNames.nonEmpty,
+        "There must be at least one advertised listener." + (
+          if (processRoles.contains(BrokerRole)) s" Perhaps all listeners 
appear in ${ControllerListenerNamesProp}?" else ""))
+    }
+    if (processRoles == Set(BrokerRole)) {
+      // KRaft broker-only
+      validateNonEmptyQuorumVotersForKRaft()
+      validateControlPlaneListenerEmptyForKRaft()
+      
validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker()
+      // nodeId must not appear in controller.quorum.voters
+      require(!voterAddressSpecsByNodeId.containsKey(nodeId),
+        s"If ${KafkaConfig.ProcessRolesProp} contains just the 'broker' role, 
the node id $nodeId must not be included in the set of voters 
${KafkaConfig.QuorumVotersProp}=${voterAddressSpecsByNodeId.asScala.keySet.toSet}")
+      // controller.listener.names must be non-empty...
+      require(controllerListenerNames.exists(_.nonEmpty),
+        s"${KafkaConfig.ControllerListenerNamesProp} must contain at least one 
value when running KRaft with just the broker role")
+      // controller.listener.names are forbidden in listeners...
+      require(controllerListeners.isEmpty,
+        s"${KafkaConfig.ControllerListenerNamesProp} must not contain a value 
appearing in the '${KafkaConfig.ListenersProp}' configuration when running 
KRaft with just the broker role")
+      // controller.listener.names must all appear in 
listener.security.protocol.map
+      controllerListenerNames.filter(_.nonEmpty).foreach { name =>
+        val listenerName = ListenerName.normalised(name)
+        if (!effectiveListenerSecurityProtocolMap.contains(listenerName)) {

Review comment:
       Probably doesn't make that much of a difference here, but we should keep 
in mind that `effectiveListenerSecurityProtocolMap` recomputes the listener map 
every time it is called. It might be worth double-checking that there are not 
any cases where this could be a performance concern.

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -2018,12 +2025,103 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
       " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
+    val advertisedListenerNames = 
effectiveAdvertisedListeners.map(_.listenerName).toSet
+
+    // validate KRaft-related configs
+    val voterAddressSpecsByNodeId = 
RaftConfig.parseVoterConnections(quorumVoters)
+    def validateNonEmptyQuorumVotersForKRaft(): Unit = {
+      if (voterAddressSpecsByNodeId.isEmpty) {
+        throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, 
${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
+      }
+    }
+    def validateControlPlaneListenerEmptyForKRaft(): Unit = {
+      require(controlPlaneListenerName.isEmpty,
+        s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in 
KRaft mode. KRaft uses ${KafkaConfig.ControllerListenerNamesProp} instead.")
+    }
+    val sourceOfAdvertisedListeners: String =
+      if (getString(KafkaConfig.AdvertisedListenersProp) != null)
+        s"${KafkaConfig.AdvertisedListenersProp}"
+      else
+        s"${KafkaConfig.ListenersProp}"
+    def 
validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker(): 
Unit = {
+      require(!advertisedListenerNames.exists(aln => 
controllerListenerNames.contains(aln.value())),
+        s"$sourceOfAdvertisedListeners must not contain KRaft controller 
listeners from ${KafkaConfig.ControllerListenerNamesProp} when 
${KafkaConfig.ProcessRolesProp} contains the broker role because Kafka clients 
that send requests via advertised listeners do not send requests to KRaft 
controllers -- they only send requests to KRaft brokers.")
+    }
+    def validateControllerQuorumVotersMustContainNodeIDForKRaftController(): 
Unit = {
+      require(voterAddressSpecsByNodeId.containsKey(nodeId),
+        s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, 
the node id $nodeId must be included in the set of voters 
${KafkaConfig.QuorumVotersProp}=${voterAddressSpecsByNodeId.asScala.keySet.toSet}")
+    }
+    def validateControllerListenerExistsForKRaftController(): Unit = {
+      require(controllerListeners.nonEmpty,
+        s"${KafkaConfig.ControllerListenerNamesProp} must contain at least one 
value appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running the KRaft controller role")
+    }
+    def 
validateControllerListenerNamesMustAppearInListenersForKRaftController(): Unit 
= {
+      val listenerNameValues = listeners.map(_.listenerName.value).toSet
+      require(controllerListenerNames.forall(cln => 
listenerNameValues.contains(cln)),
+        s"${KafkaConfig.ControllerListenerNamesProp} must only contain values 
appearing in the '${KafkaConfig.ListenersProp}' configuration when running the 
KRaft controller role")
+    }
+    def validateAdvertisedListenersNonEmptyForBroker(): Unit = {
+      require(advertisedListenerNames.nonEmpty,
+        "There must be at least one advertised listener." + (
+          if (processRoles.contains(BrokerRole)) s" Perhaps all listeners 
appear in ${ControllerListenerNamesProp}?" else ""))
+    }
+    if (processRoles == Set(BrokerRole)) {
+      // KRaft broker-only
+      validateNonEmptyQuorumVotersForKRaft()
+      validateControlPlaneListenerEmptyForKRaft()
+      
validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker()
+      // nodeId must not appear in controller.quorum.voters
+      require(!voterAddressSpecsByNodeId.containsKey(nodeId),
+        s"If ${KafkaConfig.ProcessRolesProp} contains just the 'broker' role, 
the node id $nodeId must not be included in the set of voters 
${KafkaConfig.QuorumVotersProp}=${voterAddressSpecsByNodeId.asScala.keySet.toSet}")
+      // controller.listener.names must be non-empty...
+      require(controllerListenerNames.exists(_.nonEmpty),
+        s"${KafkaConfig.ControllerListenerNamesProp} must contain at least one 
value when running KRaft with just the broker role")
+      // controller.listener.names are forbidden in listeners...
+      require(controllerListeners.isEmpty,
+        s"${KafkaConfig.ControllerListenerNamesProp} must not contain a value 
appearing in the '${KafkaConfig.ListenersProp}' configuration when running 
KRaft with just the broker role")
+      // controller.listener.names must all appear in 
listener.security.protocol.map
+      controllerListenerNames.filter(_.nonEmpty).foreach { name =>

Review comment:
       Why do we allow empty controller listener names?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to