mdedetrich commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r928933278


##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,60 @@ object CoreUtils {
     listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+    (inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+      (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+    val distinctPorts = endpoints.map(_.port).distinct
+    require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
     def validate(endPoints: Seq[EndPoint]): Unit = {
-      // filter port 0 for unit tests
-      val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
       val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
       require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-      if (requireDistinctPorts) {
-        val distinctPorts = portsExcludingZero.distinct
-        require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+      val (duplicatePorts, _) = endPoints.filter {
+        // filter port 0 for unit tests
+        ep => ep.port != 0
+      }.groupBy(_.port).partition {
+        case (_, endpoints) => endpoints.size > 1
+      }
+
+      // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+      val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+        case (port, eps) =>
+          (port, eps.partition(ep =>
+            ep.host != null && inetAddressValidator.isValid(ep.host)
+          ))
+      }
+
+      // Iterate through every grouping of duplicates by port to see if they 
are valid
+      duplicatePortsPartitionedByValidIps.foreach {
+        case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+          if (requireDistinctPorts)
+            checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)

Review Comment:
   Doing so causes this test to fail
   
   ```
   props.put(KafkaConfig.ListenersProp, 
"PLAINTEXT://apache.org:9092,SSL://[::1]:9092")
   caught = assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(props))
   assertTrue(caught.getMessage.contains("Each listener must have a different 
port"))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to