[
https://issues.apache.org/jira/browse/HADOOP-18024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Masatake Iwasaki resolved HADOOP-18024.
---------------------------------------
Fix Version/s: 3.4.0
Hadoop Flags: Reviewed
Resolution: Fixed
> SocketChannel is not closed when IOException happens in
> Server$Listener.doAccept
> --------------------------------------------------------------------------------
>
> Key: HADOOP-18024
> URL: https://issues.apache.org/jira/browse/HADOOP-18024
> Project: Hadoop Common
> Issue Type: Bug
> Components: ipc
> Affects Versions: 3.2.2
> Reporter: Haoze Wu
> Assignee: Haoze Wu
> Priority: Major
> Labels: pull-request-available
> Fix For: 3.4.0
>
> Time Spent: 3.5h
> Remaining Estimate: 0h
>
> This is a follow-up of HADOOP-17552.
> When the symptom described in HADOOP-17552 happens, the client may time out
> in 2min, according to the default RPC timeout configuration specified in
> HADOOP-17552. Before this timeout, the client just waits, and does not know
> this issue happens.
> However, we recently found that actually the client doesn’t need to waste
> this 2min, and the server’s availability can be also improved. If the
> IOException happens in line 1402 or 1403 or 1404, we can just close this
> problematic `SocketChannel` and continue to accept new socket connections.
> The client side can also be aware of the close socket immediately, instead of
> waiting 2min.
> The old implementation:
> {code:java}
> //hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
> public void run() {
> while (running) {
> // ...
> try {
> // ...
> while (iter.hasNext()) {
> // ...
> try {
> if (key.isValid()) {
> if (key.isAcceptable())
> doAccept(key); // line 1348
> }
> } catch (IOException e) { // line 1350
> }
> // ...
> }
> } catch (OutOfMemoryError e) {
> // ...
> } catch (Exception e) {
> // ...
> }
> }
> } {code}
> {code:java}
> //hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
> void doAccept(SelectionKey key) throws InterruptedException, IOException,
> OutOfMemoryError {
> ServerSocketChannel server = (ServerSocketChannel) key.channel();
> SocketChannel channel;
> while ((channel = server.accept()) != null) { // line 1400
> channel.configureBlocking(false); // line 1402
> channel.socket().setTcpNoDelay(tcpNoDelay); // line 1403
> channel.socket().setKeepAlive(true); // line 1404
> Reader reader = getReader();
> Connection c = connectionManager.register(channel,
> this.listenPort, this.isOnAuxiliaryPort);
> // If the connectionManager can't take it, close the connection.
> if (c == null) {
> if (channel.isOpen()) {
> IOUtils.cleanup(null, channel);
> }
> connectionManager.droppedConnections.getAndIncrement();
> continue;
> }
> key.attach(c); // so closeCurrentConnection can get the object
> reader.addConnection(c);
> }
> } {code}
>
> We propose that the following implementation is better:
> {code:java}
> void doAccept(SelectionKey key) throws InterruptedException, IOException,
> OutOfMemoryError {
> ServerSocketChannel server = (ServerSocketChannel) key.channel();
> SocketChannel channel;
> while ((channel = server.accept()) != null) { // line 1400
> try {
> channel.configureBlocking(false); // line 1402
> channel.socket().setTcpNoDelay(tcpNoDelay); // line 1403
> channel.socket().setKeepAlive(true); // line 1404
> } catch (IOException e) {
> LOG.warn(...);
> try {
> channel.socket().close();
> channel.close();
> } catch (IOException ignored) { }
> continue;
> }
> // ...
> }
> }{code}
> The advantages include:
> # {*}In the old implementation{*}, the `ServerSocketChannel` was abandoned
> due to the single exception in this single `SocketChannel`, because the
> exception handler is in line 1350. {*}In the new implementation{*}, we use a
> try-catch to handle the exception in line 1402 or 1403 or 1404, then the
> `ServerSocketChannel` can continue to accept new connections, and don’t need
> to go back to the line 1348 in the next while loop in the run method.
> # {*}In the old implementation{*}, the client (another endpoint of this
> `SocketChannel`) is not aware of this issue, because the `SocketChannel` is
> accepted and not closed. {*}In the new implementation{*}, we close the
> `SocketChannel` when the IOException happens, then the client will
> immediately get EOF from the socket. Then the client can choose to retry or
> throw an exception, by the client’s discretion.
>
> We have confirmed that this patch works as expected, in our local machine.
>
> This code pattern was adopted by other communities. For example, in Kafka
> [https://github.com/apache/kafka/blob/23e9818e625976c22fe6d4297a5ab76b01f92ef6/core/src/main/scala/kafka/network/SocketServer.scala#L714-L740]:
> {code:java}
> /**
> * Accept a new connection
> */
> private def accept(key: SelectionKey): Option[SocketChannel] = {
> val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
> val socketChannel = serverSocketChannel.accept()
> try {
> connectionQuotas.inc(endPoint.listenerName,
> socketChannel.socket.getInetAddress, blockedPercentMeter)
> configureAcceptedSocketChannel(socketChannel)
> Some(socketChannel)
> } catch {
> case e: TooManyConnectionsException =>
> info(...)
> close(endPoint.listenerName, socketChannel)
> None
> case e: ConnectionThrottledException =>
> // ...
> None
> case e: IOException =>
> error(...)
> close(endPoint.listenerName, socketChannel)
> None
> }
> }
> /**
> * Close `channel` and decrement the connection count.
> */
> def close(listenerName: ListenerName, channel: SocketChannel): Unit = {
> if (channel != null) {
> // ...
> closeSocket(channel)
> }
> }
> protected def closeSocket(channel: SocketChannel): Unit = {
> CoreUtils.swallow(channel.socket().close(), this, Level.ERROR)
> CoreUtils.swallow(channel.close(), this, Level.ERROR)
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]