This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
The following commit(s) were added to refs/heads/master by this push:
new 45f84aa [SSHD-1003] Use asynchronous streams when forwarding ports
45f84aa is described below
commit 45f84aab59b2e11d72942cffe9d810e37ab64959
Author: Guillaume Nodet <[email protected]>
AuthorDate: Thu Jun 4 10:15:54 2020 +0200
[SSHD-1003] Use asynchronous streams when forwarding ports
---
.../apache/sshd/common/io/nio2/Nio2Session.java | 1 -
.../sshd/server/forward/TcpipServerChannel.java | 47 +++++++++-------------
2 files changed, 20 insertions(+), 28 deletions(-)
diff --git
a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
index 1d5fc45..d595e90 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
@@ -368,7 +368,6 @@ public class Nio2Session extends AbstractCloseable
implements IoSession {
log.debug("handleReadCycleCompletion({}) Socket has been
disconnected (result={}), closing IoSession now",
this, result);
}
- close(true);
}
} catch (Throwable exc) {
completionHandler.failed(exc, attachment);
diff --git
a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
index 6ec0629..aa07126 100644
---
a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++
b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -19,7 +19,6 @@
package org.apache.sshd.server.forward;
import java.io.IOException;
-import java.io.OutputStream;
import java.net.ConnectException;
import java.net.SocketAddress;
import java.util.Collections;
@@ -31,9 +30,10 @@ import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.RuntimeSshException;
import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.channel.BufferedIoOutputStream;
import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
import org.apache.sshd.common.channel.ChannelFactory;
-import org.apache.sshd.common.channel.ChannelOutputStream;
import org.apache.sshd.common.channel.Window;
import org.apache.sshd.common.channel.exception.SshChannelOpenException;
import org.apache.sshd.common.forward.ForwardingTunnelEndpointsProvider;
@@ -41,6 +41,7 @@ import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.io.IoConnectFuture;
import org.apache.sshd.common.io.IoConnector;
import org.apache.sshd.common.io.IoHandler;
+import org.apache.sshd.common.io.IoOutputStream;
import org.apache.sshd.common.io.IoServiceFactory;
import org.apache.sshd.common.io.IoSession;
import org.apache.sshd.common.session.Session;
@@ -95,7 +96,7 @@ public class TcpipServerChannel extends AbstractServerChannel
implements Forward
private final ForwardingFilter.Type type;
private IoConnector connector;
private IoSession ioSession;
- private OutputStream out;
+ private IoOutputStream out;
private SshdSocketAddress tunnelEntrance;
private SshdSocketAddress tunnelExit;
private SshdSocketAddress originatorAddress;
@@ -196,9 +197,19 @@ public class TcpipServerChannel extends
AbstractServerChannel implements Forward
throw new RuntimeSshException(e);
}
- // TODO: revisit for better threading. Use async io ?
- out = new ChannelOutputStream(
- this, getRemoteWindow(), log,
SshConstants.SSH_MSG_CHANNEL_DATA, true);
+ out = new BufferedIoOutputStream(
+ "tcpip channel", new ChannelAsyncOutputStream(this,
SshConstants.SSH_MSG_CHANNEL_DATA) {
+ @SuppressWarnings("synthetic-access")
+ @Override
+ protected CloseFuture doCloseGracefully() {
+ try {
+ sendEof();
+ } catch (IOException e) {
+ session.exceptionCaught(e);
+ }
+ return super.doCloseGracefully();
+ }
+ });
IoHandler handler = new IoHandler() {
@Override
@SuppressWarnings("synthetic-access")
@@ -208,10 +219,9 @@ public class TcpipServerChannel extends
AbstractServerChannel implements Forward
log.debug("doInit({}) Ignoring write to channel in
CLOSING state", TcpipServerChannel.this);
}
} else {
- Buffer buffer = new ByteArrayBuffer(message.available() +
Long.SIZE, false);
+ Buffer buffer = new ByteArrayBuffer(message.available(),
false);
buffer.putBuffer(message);
- out.write(buffer.array(), buffer.rpos(),
buffer.available());
- out.flush();
+ out.writePacket(buffer);
}
}
@@ -302,24 +312,7 @@ public class TcpipServerChannel extends
AbstractServerChannel implements Forward
@Override
protected Closeable getInnerCloseable() {
return builder()
- .run(toString(), () -> {
- /*
- * In case of graceful shutdown (e.g. when the remote
channel is gently closed) we also need to
- * close the ChannelOutputStream which flushes remaining
buffer and sends SSH_MSG_CHANNEL_EOF back
- * to the client.
- */
- if (out != null) {
- try {
- if (log.isDebugEnabled()) {
- log.debug("Closing channel output stream of
{}", this);
- }
- out.close();
- } catch (IOException | RuntimeException ignored) {
- log.debug("{} while closing channel output stream
of {}: {}",
- ignored.getClass().getSimpleName(), this,
ignored.getMessage(), ignored);
- }
- }
- })
+ .close(out)
.close(super.getInnerCloseable())
.close(new AbstractCloseable() {
private final CloseableExecutorService executor