jsancio commented on code in PR #18387:
URL: https://github.com/apache/kafka/pull/18387#discussion_r1942018528


##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -560,7 +560,17 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     controllerListenerNames.flatMap { name =>
       controllerAdvertisedListeners
         .find(endpoint => 
endpoint.listenerName.equals(ListenerName.normalised(name)))
-        .orElse(controllerListenersValue.find(endpoint => 
endpoint.listenerName.equals(ListenerName.normalised(name))))
+        .orElse(
+          // If users don't define advertised.listeners, the advertised 
controller listeners inherit from listeners configuration
+          // which match listener names in controller.listener.names. Also, 
removing "0.0.0.0" host to avoid validation errors.
+          controllerListenersValue
+            .find(endpoint => 
endpoint.listenerName.equals(ListenerName.normalised(name)))
+            .map(endpoint => if (endpoint.host == "0.0.0.0") {
+              new EndPoint(null, endpoint.port, endpoint.listenerName, 
endpoint.securityProtocol)

Review Comment:
   I think that `null` or `""` works in some network because in the 
`ControllerServer` code we do a reverse lookup.
   ```scala
           val listenerInfo = ListenerInfo
             
.create(config.effectiveAdvertisedControllerListeners.map(_.toJava).asJava)
             .withWildcardHostnamesResolved()
             .withEphemeralPortsCorrected(name => socketServer.boundPort(new 
ListenerName(name)))
   ```
   Notice the use of `withWildcardHostnamesResolved`. Can you document this in 
the comment above?
   
   I should also mentioned that `0.0.0.0` is IPv4. Kafka also supports IPv6 
which is encoded as `::`.



##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1278,6 +1278,44 @@ class KafkaConfigTest {
     KafkaConfig.fromProps(props)
   }
 
+  @Test
+  def 
testImplicitAllBindingControllerListenersCanBeAdvertisedForKRaftCombined(): 
Unit = {
+    val props = new Properties()
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")

Review Comment:
   The name of the method doesn't look correct since the process role is just 
controller.



##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -575,7 +585,8 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     if (advertisedListenersProp != null) {
       CoreUtils.listenerListToEndPoints(advertisedListenersProp, 
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
     } else {
-      listeners
+      // Only use implicit broker listeners here. Implicit controller 
listeners are handled in effectiveAdvertisedControllerListeners.
+      listeners.filterNot(l => 
controllerListenerNames.contains(l.listenerName.value()))

Review Comment:
   Do you need this? `advertisedListeners` is a private method that contains 
all of the advertise listeners if it is set or listeners if it is not set. 
`effectiveAdvertisedBrokerListeners` and 
`effectiveAdvertisedControllerListeners` do the necessary filtering.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to