This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
The following commit(s) were added to refs/heads/master by this push:
new 1cf3056 [SSHD-1079] Async mode on the local port forwarder (#167)
1cf3056 is described below
commit 1cf3056cfddcdbf9aef6c96ef662f5e001477f97
Author: Guillaume Nodet <[email protected]>
AuthorDate: Tue Sep 22 08:30:06 2020 +0200
[SSHD-1079] Async mode on the local port forwarder (#167)
Disabled by default
---
.../org/apache/sshd/common/forward/SocksProxy.java | 12 ++++++++---
.../sshd/common/forward/TcpipClientChannel.java | 25 +++++++++++++++++-----
2 files changed, 29 insertions(+), 8 deletions(-)
diff --git
a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
index bdb0b7f..f87d35c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.io.IoHandler;
+import org.apache.sshd.common.io.IoOutputStream;
import org.apache.sshd.common.io.IoSession;
import org.apache.sshd.common.session.ConnectionService;
import org.apache.sshd.common.util.buffer.Buffer;
@@ -100,9 +101,14 @@ public class SocksProxy extends AbstractCloseable
implements IoHandler {
}
protected void onMessage(Buffer buffer) throws IOException {
- OutputStream invertedIn = channel.getInvertedIn();
- invertedIn.write(buffer.array(), buffer.rpos(),
buffer.available());
- invertedIn.flush();
+ IoOutputStream asyncIn = channel.getAsyncIn();
+ if (asyncIn != null) {
+ asyncIn.writePacket(buffer);
+ } else {
+ OutputStream invertedIn = channel.getInvertedIn();
+ invertedIn.write(buffer.array(), buffer.rpos(),
buffer.available());
+ invertedIn.flush();
+ }
}
@Override
diff --git
a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
index ae78076..c743948 100644
---
a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
+++
b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
@@ -33,8 +33,11 @@ import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.NamedResource;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.channel.ChannelAsyncInputStream;
+import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
import org.apache.sshd.common.channel.ChannelOutputStream;
import org.apache.sshd.common.channel.Window;
+import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.io.IoSession;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.ValidateUtils;
@@ -161,12 +164,24 @@ public class TcpipClientChannel extends
AbstractClientChannel implements Forward
@Override
protected synchronized void doOpen() throws IOException {
if (streaming == Streaming.Async) {
- throw new IllegalArgumentException("Asynchronous streaming isn't
supported yet on this channel");
+ asyncIn = new ChannelAsyncOutputStream(this,
SshConstants.SSH_MSG_CHANNEL_DATA) {
+ @SuppressWarnings("synthetic-access")
+ @Override
+ protected CloseFuture doCloseGracefully() {
+ try {
+ sendEof();
+ } catch (IOException e) {
+ getSession().exceptionCaught(e);
+ }
+ return super.doCloseGracefully();
+ }
+ };
+ asyncOut = new ChannelAsyncInputStream(this);
+ } else {
+ out = new ChannelOutputStream(
+ this, getRemoteWindow(), log,
SshConstants.SSH_MSG_CHANNEL_DATA, true);
+ invertedIn = out;
}
-
- out = new ChannelOutputStream(
- this, getRemoteWindow(), log,
SshConstants.SSH_MSG_CHANNEL_DATA, true);
- invertedIn = out;
}
@Override