[ 
https://issues.apache.org/jira/browse/KAFKA-13457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haoze Wu updated KAFKA-13457:
-----------------------------
    Description: 
When the kafka.network.Acceptor in SocketServer.scala accepts a new connection 
in the `accept` function, it handles the `TooManyConnectionsException` and 
`ConnectionThrottledException`. However, the socketChannel operations (line 720 
or 721 or 722) within the try block may potentially throw an IOException as 
well, which is not handled.

 
{code:java}
//core/src/main/scala/kafka/network/SocketServer.scala
// Acceptor class
  private def accept(key: SelectionKey): Option[SocketChannel] = {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    val socketChannel = serverSocketChannel.accept()     // line 717
    try {
      connectionQuotas.inc(endPoint.listenerName, 
socketChannel.socket.getInetAddress, blockedPercentMeter)
      socketChannel.configureBlocking(false)             // line 720
      socketChannel.socket().setTcpNoDelay(true)         // line 721
      socketChannel.socket().setKeepAlive(true)          // line 722
      if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socketChannel.socket().setSendBufferSize(sendBufferSize)
      Some(socketChannel)
    } catch {
      case e: TooManyConnectionsException =>       
        info(s"Rejected connection from ${e.ip}, address already has the 
configured maximum of ${e.count} connections.")
        close(endPoint.listenerName, socketChannel)
        None
      case e: ConnectionThrottledException => 
        val ip = socketChannel.socket.getInetAddress
        debug(s"Delaying closing of connection from $ip for ${e.throttleTimeMs} 
ms")
        val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
        throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
        None
    }
  }
{code}
This thrown IOException is caught in the caller `acceptNewConnections` in line 
706, which only prints an error message. The socketChannel that throws this 
IOException is not closed.

 
{code:java}
//core/src/main/scala/kafka/network/SocketServer.scala
  private def acceptNewConnections(): Unit = {
    val ready = nioSelector.select(500)
    if (ready > 0) {
      val keys = nioSelector.selectedKeys()
      val iter = keys.iterator()
      while (iter.hasNext && isRunning) {
        try {
          val key = iter.next
          iter.remove()          if (key.isAcceptable) {
            accept(key).foreach { socketChannel => 
                ...
              } while (!assignNewConnection(socketChannel, processor, 
retriesLeft == 0))
            }
          } else
            throw new IllegalStateException("Unrecognized key state for 
acceptor thread.")
        } catch {
          case e: Throwable => error("Error while accepting connection", e)   
// line 706
        }
      }
    }
  }
{code}
We found during testing this would cause our Kafka clients to experience errors 
(InvalidReplicationFactorException) for 40+ seconds when creating new topics. 
After 40 seconds, the clients would be able to create new topics successfully.

We check that after adding the socketChannel.close() upon IOException, the 
symptoms will disappear, so the clients do not need to wait for 40s to be 
working again.

 

 

  was:
When the kafka.network.Acceptor in SocketServer.scala accepts a new connection 
in the `accept` function, it handles the `TooManyConnectionsException` and 
`ConnectionThrottledException`. However, line 717 or the socketChannel 
operations within the try block may potentially throw an IOException as well, 
which is not handled.

 
{code:java}
//core/src/main/scala/kafka/network/SocketServer.scala
// Acceptor class
  private def accept(key: SelectionKey): Option[SocketChannel] = {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    val socketChannel = serverSocketChannel.accept()     // line 717
    try {
      connectionQuotas.inc(endPoint.listenerName, 
socketChannel.socket.getInetAddress, blockedPercentMeter)
      socketChannel.configureBlocking(false)             
      socketChannel.socket().setTcpNoDelay(true)         
      socketChannel.socket().setKeepAlive(true)          
      if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socketChannel.socket().setSendBufferSize(sendBufferSize)
      Some(socketChannel)
    } catch {
      case e: TooManyConnectionsException =>       
        info(s"Rejected connection from ${e.ip}, address already has the 
configured maximum of ${e.count} connections.")
        close(endPoint.listenerName, socketChannel)
        None
      case e: ConnectionThrottledException => 
        val ip = socketChannel.socket.getInetAddress
        debug(s"Delaying closing of connection from $ip for ${e.throttleTimeMs} 
ms")
        val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
        throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
        None
    }
  }
{code}
This thrown IOException is caught in the caller `acceptNewConnections` in line 
706, which only prints an error message. The socketChannel that throws this 
IOException is not closed.

 
{code:java}
//core/src/main/scala/kafka/network/SocketServer.scala
  private def acceptNewConnections(): Unit = {
    val ready = nioSelector.select(500)
    if (ready > 0) {
      val keys = nioSelector.selectedKeys()
      val iter = keys.iterator()
      while (iter.hasNext && isRunning) {
        try {
          val key = iter.next
          iter.remove()          if (key.isAcceptable) {
            accept(key).foreach { socketChannel => 
                ...
              } while (!assignNewConnection(socketChannel, processor, 
retriesLeft == 0))
            }
          } else
            throw new IllegalStateException("Unrecognized key state for 
acceptor thread.")
        } catch {
          case e: Throwable => error("Error while accepting connection", e)   
// line 706
        }
      }
    }
  }
{code}
We found during testing this would cause our Kafka clients to experience errors 
(InvalidReplicationFactorException) for 40+ seconds when creating new topics. 
After 40 seconds, the clients would be able to create new topics successfully.

We check that after adding the socketChannel.close() upon IOException, the 
symptoms will disappear, so the clients do not need to wait for 40s to be 
working again.

 

 


> socketChannel in Acceptor#accept is not closed upon IOException
> ---------------------------------------------------------------
>
>                 Key: KAFKA-13457
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13457
>             Project: Kafka
>          Issue Type: Bug
>          Components: network
>    Affects Versions: 2.8.0
>            Reporter: Haoze Wu
>            Priority: Major
>
> When the kafka.network.Acceptor in SocketServer.scala accepts a new 
> connection in the `accept` function, it handles the 
> `TooManyConnectionsException` and `ConnectionThrottledException`. However, 
> the socketChannel operations (line 720 or 721 or 722) within the try block 
> may potentially throw an IOException as well, which is not handled.
>  
> {code:java}
> //core/src/main/scala/kafka/network/SocketServer.scala
> // Acceptor class
>   private def accept(key: SelectionKey): Option[SocketChannel] = {
>     val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
>     val socketChannel = serverSocketChannel.accept()     // line 717
>     try {
>       connectionQuotas.inc(endPoint.listenerName, 
> socketChannel.socket.getInetAddress, blockedPercentMeter)
>       socketChannel.configureBlocking(false)             // line 720
>       socketChannel.socket().setTcpNoDelay(true)         // line 721
>       socketChannel.socket().setKeepAlive(true)          // line 722
>       if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
>         socketChannel.socket().setSendBufferSize(sendBufferSize)
>       Some(socketChannel)
>     } catch {
>       case e: TooManyConnectionsException =>       
>         info(s"Rejected connection from ${e.ip}, address already has the 
> configured maximum of ${e.count} connections.")
>         close(endPoint.listenerName, socketChannel)
>         None
>       case e: ConnectionThrottledException => 
>         val ip = socketChannel.socket.getInetAddress
>         debug(s"Delaying closing of connection from $ip for 
> ${e.throttleTimeMs} ms")
>         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
>         throttledSockets += DelayedCloseSocket(socketChannel, 
> endThrottleTimeMs)
>         None
>     }
>   }
> {code}
> This thrown IOException is caught in the caller `acceptNewConnections` in 
> line 706, which only prints an error message. The socketChannel that throws 
> this IOException is not closed.
>  
> {code:java}
> //core/src/main/scala/kafka/network/SocketServer.scala
>   private def acceptNewConnections(): Unit = {
>     val ready = nioSelector.select(500)
>     if (ready > 0) {
>       val keys = nioSelector.selectedKeys()
>       val iter = keys.iterator()
>       while (iter.hasNext && isRunning) {
>         try {
>           val key = iter.next
>           iter.remove()          if (key.isAcceptable) {
>             accept(key).foreach { socketChannel => 
>                 ...
>               } while (!assignNewConnection(socketChannel, processor, 
> retriesLeft == 0))
>             }
>           } else
>             throw new IllegalStateException("Unrecognized key state for 
> acceptor thread.")
>         } catch {
>           case e: Throwable => error("Error while accepting connection", e)   
> // line 706
>         }
>       }
>     }
>   }
> {code}
> We found during testing this would cause our Kafka clients to experience 
> errors (InvalidReplicationFactorException) for 40+ seconds when creating new 
> topics. After 40 seconds, the clients would be able to create new topics 
> successfully.
> We check that after adding the socketChannel.close() upon IOException, the 
> symptoms will disappear, so the clients do not need to wait for 40s to be 
> working again.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to