mdedetrich commented on code in PR #11478: URL: https://github.com/apache/kafka/pull/11478#discussion_r928617296
########## core/src/main/scala/kafka/utils/CoreUtils.scala: ########## @@ -252,16 +255,57 @@ object CoreUtils { listenerListToEndPoints(listeners, securityProtocolMap, true) } + def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean = + (inetAddressValidator.isValidInet4Address(first) && inetAddressValidator.isValidInet6Address(second)) || + (inetAddressValidator.isValidInet6Address(first) && inetAddressValidator.isValidInet4Address(second)) + + def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: String): Unit = { + val distinctPorts = endpoints.map(_.port).distinct + require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener must have a different port, listeners: $listeners") + } + def listenerListToEndPoints(listeners: String, securityProtocolMap: Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): Seq[EndPoint] = { def validate(endPoints: Seq[EndPoint]): Unit = { - // filter port 0 for unit tests - val portsExcludingZero = endPoints.map(_.port).filter(_ != 0) val distinctListenerNames = endPoints.map(_.listenerName).distinct - require(distinctListenerNames.size == endPoints.size, s"Each listener must have a different name, listeners: $listeners") - if (requireDistinctPorts) { - val distinctPorts = portsExcludingZero.distinct - require(distinctPorts.size == portsExcludingZero.size, s"Each listener must have a different port, listeners: $listeners") + + val (duplicatePorts, nonDuplicatePorts) = endPoints.filter { + // filter port 0 for unit tests + ep => ep.port != 0 + }.groupBy(_.port).partition { + case (_, endpoints) => endpoints.size > 1 + } + + val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case (_, eps) => eps }.toList + + if (requireDistinctPorts) + checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners) + + // Exception case, let's allow duplicate ports if one host is on IPv4 and the other one is on IPv6 + val duplicatePortsPartitionedByValidIps = duplicatePorts.map { + case (port, eps) => + (port, eps.partition(ep => + ep.host != null && inetAddressValidator.isValid(ep.host) + )) + } + + // Iterate through every grouping of duplicates by port to see if they are valid + duplicatePortsPartitionedByValidIps.foreach { + case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) => + if (requireDistinctPorts) + checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners) Review Comment: > Is that what we expected? I thought in this PR(KIP), we allow to use the same port only when one is in ipv4, and the other one is in ipv6. But this case also allow the 3rd set of same port listener. Is it correct? No its not expected, I have to explicitly check for null hosts (didn't realize they existed). Will work on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org