mdedetrich commented on code in PR #11478: URL: https://github.com/apache/kafka/pull/11478#discussion_r928747703
########## core/src/main/scala/kafka/utils/CoreUtils.scala: ########## @@ -252,16 +255,57 @@ object CoreUtils { listenerListToEndPoints(listeners, securityProtocolMap, true) } + def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean = + (inetAddressValidator.isValidInet4Address(first) && inetAddressValidator.isValidInet6Address(second)) || + (inetAddressValidator.isValidInet6Address(first) && inetAddressValidator.isValidInet4Address(second)) + + def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: String): Unit = { + val distinctPorts = endpoints.map(_.port).distinct + require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener must have a different port, listeners: $listeners") + } + def listenerListToEndPoints(listeners: String, securityProtocolMap: Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): Seq[EndPoint] = { def validate(endPoints: Seq[EndPoint]): Unit = { - // filter port 0 for unit tests - val portsExcludingZero = endPoints.map(_.port).filter(_ != 0) val distinctListenerNames = endPoints.map(_.listenerName).distinct - require(distinctListenerNames.size == endPoints.size, s"Each listener must have a different name, listeners: $listeners") - if (requireDistinctPorts) { - val distinctPorts = portsExcludingZero.distinct - require(distinctPorts.size == portsExcludingZero.size, s"Each listener must have a different port, listeners: $listeners") + + val (duplicatePorts, nonDuplicatePorts) = endPoints.filter { + // filter port 0 for unit tests + ep => ep.port != 0 + }.groupBy(_.port).partition { + case (_, endpoints) => endpoints.size > 1 + } + + val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case (_, eps) => eps }.toList + + if (requireDistinctPorts) + checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners) + + // Exception case, let's allow duplicate ports if one host is on IPv4 and the other one is on IPv6 + val duplicatePortsPartitionedByValidIps = duplicatePorts.map { + case (port, eps) => + (port, eps.partition(ep => + ep.host != null && inetAddressValidator.isValid(ep.host) + )) + } + + // Iterate through every grouping of duplicates by port to see if they are valid + duplicatePortsPartitionedByValidIps.foreach { + case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) => + if (requireDistinctPorts) + checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners) Review Comment: So I have just pushed a commit `Handle duplicatesWithoutIpHosts and duplicatesWithoutIpHosts nonEmpty case` which solves this issue by checking if `duplicatesWithoutIpHosts` is non empty even if `duplicatesWithIpHosts` happens to be valid for a given port. A comment has been added to make clear whats going on. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org