jsancio commented on code in PR #18387: URL: https://github.com/apache/kafka/pull/18387#discussion_r1942018528
########## core/src/main/scala/kafka/server/KafkaConfig.scala: ########## @@ -560,7 +560,17 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) controllerListenerNames.flatMap { name => controllerAdvertisedListeners .find(endpoint => endpoint.listenerName.equals(ListenerName.normalised(name))) - .orElse(controllerListenersValue.find(endpoint => endpoint.listenerName.equals(ListenerName.normalised(name)))) + .orElse( + // If users don't define advertised.listeners, the advertised controller listeners inherit from listeners configuration + // which match listener names in controller.listener.names. Also, removing "0.0.0.0" host to avoid validation errors. + controllerListenersValue + .find(endpoint => endpoint.listenerName.equals(ListenerName.normalised(name))) + .map(endpoint => if (endpoint.host == "0.0.0.0") { + new EndPoint(null, endpoint.port, endpoint.listenerName, endpoint.securityProtocol) Review Comment: I think that `null` or `""` works in some network because in the `ControllerServer` code we do a reverse lookup. ```scala val listenerInfo = ListenerInfo .create(config.effectiveAdvertisedControllerListeners.map(_.toJava).asJava) .withWildcardHostnamesResolved() .withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name))) ``` Notice the use of `withWildcardHostnamesResolved`. Can you document this in the comment above? I should also mentioned that `0.0.0.0` is IPv4. Kafka also supports IPv6 which is encoded as `::`. ########## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ########## @@ -1278,6 +1278,44 @@ class KafkaConfigTest { KafkaConfig.fromProps(props) } + @Test + def testImplicitAllBindingControllerListenersCanBeAdvertisedForKRaftCombined(): Unit = { + val props = new Properties() + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller") Review Comment: The name of the method doesn't look correct since the process role is just controller. ########## core/src/main/scala/kafka/server/KafkaConfig.scala: ########## @@ -575,7 +585,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) if (advertisedListenersProp != null) { CoreUtils.listenerListToEndPoints(advertisedListenersProp, effectiveListenerSecurityProtocolMap, requireDistinctPorts=false) } else { - listeners + // Only use implicit broker listeners here. Implicit controller listeners are handled in effectiveAdvertisedControllerListeners. + listeners.filterNot(l => controllerListenerNames.contains(l.listenerName.value())) Review Comment: Do you need this? `advertisedListeners` is a private method that contains all of the advertise listeners if it is set or listeners if it is not set. `effectiveAdvertisedBrokerListeners` and `effectiveAdvertisedControllerListeners` do the necessary filtering. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org