This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
The following commit(s) were added to refs/heads/master by this push:
new 72d6c00 [SSHD-1070] Limit the amount of data that is kept in memory
for forwa… (#166)
72d6c00 is described below
commit 72d6c0086d2e86060e82e39b531338473f5195d0
Author: Guillaume Nodet <[email protected]>
AuthorDate: Tue Sep 22 08:29:22 2020 +0200
[SSHD-1070] Limit the amount of data that is kept in memory for forwa…
(#166)
* Add a switch to choose between sync / async modes for the
TcpipServerChannel
* Enable load tests
---
.../java/org/apache/sshd/common/io/IoSession.java | 19 ++++++
sshd-core/pom.xml | 3 -
.../apache/sshd/client/channel/ClientChannel.java | 11 +---
.../sshd/common/channel/SimpleIoOutputStream.java | 67 ++++++++++++++++++++
.../sshd/common/channel/StreamingChannel.java | 37 +++++++++++
.../apache/sshd/common/io/nio2/Nio2Session.java | 39 ++++++++++++
.../org/apache/sshd/core/CoreModuleProperties.java | 17 ++++++
.../sshd/server/forward/TcpipServerChannel.java | 71 +++++++++++++++++-----
.../src/test/java/org/apache/sshd/LoadTest.java | 2 -
9 files changed, 237 insertions(+), 29 deletions(-)
diff --git a/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java
b/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java
index 20dbb46..f8de2b4 100644
--- a/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java
+++ b/sshd-common/src/main/java/org/apache/sshd/common/io/IoSession.java
@@ -93,4 +93,23 @@ public interface IoSession extends
ConnectionEndpointsIndicator, PacketWriter, C
* @throws IOException If failed to shutdown the stream
*/
void shutdownOutputStream() throws IOException;
+
+ /**
+ * Suspend read operations on this session. May do nothing if not
supported by the session implementation.
+ *
+ * If the session usage includes a graceful shutdown with messages being
exchanged, the caller needs to
+ * take care of resuming reading the input in order to actually be able to
carry on the conversation with
+ * the peer.
+ */
+ default void suspendRead() {
+ // Do nothing by default, but can be overriden by implementations
+ }
+
+ /**
+ * Resume read operations on this session. May do nothing if not supported
by the session implementation.
+ */
+ default void resumeRead() {
+ // Do nothing by default, but can be overriden by implementations
+ }
+
}
diff --git a/sshd-core/pom.xml b/sshd-core/pom.xml
index 48bffcd..8b4db65 100644
--- a/sshd-core/pom.xml
+++ b/sshd-core/pom.xml
@@ -184,9 +184,6 @@
<configuration>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<reportsDirectory>${project.build.directory}/surefire-reports-nio2</reportsDirectory>
- <excludes>
- <exclude>**/*LoadTest.java</exclude>
- </excludes>
</configuration>
</plugin>
</plugins>
diff --git
a/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannel.java
b/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannel.java
index 6bd15dc..7897ba7 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannel.java
@@ -31,6 +31,7 @@ import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.client.session.ClientSessionHolder;
import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.StreamingChannel;
import org.apache.sshd.common.io.IoInputStream;
import org.apache.sshd.common.io.IoOutputStream;
@@ -41,11 +42,7 @@ import org.apache.sshd.common.io.IoOutputStream;
*
* @author <a href="mailto:[email protected]">Apache MINA SSHD Project</a>
*/
-public interface ClientChannel extends Channel, ClientSessionHolder {
- enum Streaming {
- Async,
- Sync
- }
+public interface ClientChannel extends Channel, StreamingChannel,
ClientSessionHolder {
@Override
default ClientSession getClientSession() {
@@ -57,10 +54,6 @@ public interface ClientChannel extends Channel,
ClientSessionHolder {
*/
String getChannelType();
- Streaming getStreaming();
-
- void setStreaming(Streaming streaming);
-
IoOutputStream getAsyncIn();
IoInputStream getAsyncOut();
diff --git
a/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java
b/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java
new file mode 100644
index 0000000..6fee66a
--- /dev/null
+++
b/sshd-core/src/main/java/org/apache/sshd/common/channel/SimpleIoOutputStream.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.channel;
+
+import java.io.IOException;
+
+import org.apache.sshd.common.io.AbstractIoWriteFuture;
+import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.closeable.AbstractCloseable;
+import org.apache.sshd.common.util.io.IoUtils;
+import org.apache.sshd.server.forward.TcpipServerChannel;
+
+/**
+ * An implementation of {@link IoOutputStream} using a synchronous {@link
ChannelOutputStream}.
+ *
+ * @author <a href="mailto:[email protected]">Apache MINA SSHD
Project</a>
+ */
+public class SimpleIoOutputStream extends AbstractCloseable implements
IoOutputStream {
+
+ protected final ChannelOutputStream os;
+
+ public SimpleIoOutputStream(ChannelOutputStream os) {
+ this.os = os;
+ }
+
+ @Override
+ protected void doCloseImmediately() {
+ IoUtils.closeQuietly(os);
+ super.doCloseImmediately();
+ }
+
+ @Override
+ public IoWriteFuture writePacket(Buffer buffer) throws IOException {
+ os.write(buffer.array(), buffer.rpos(), buffer.available());
+ os.flush();
+ DefaultIoWriteFuture f = new DefaultIoWriteFuture(this, null);
+ f.setValue(true);
+ return f;
+ }
+
+ protected static class DefaultIoWriteFuture extends AbstractIoWriteFuture {
+
+ public DefaultIoWriteFuture(Object id, Object lock) {
+ super(id, lock);
+ }
+
+ }
+
+}
diff --git
a/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java
b/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java
new file mode 100644
index 0000000..e2d7b94
--- /dev/null
+++
b/sshd-core/src/main/java/org/apache/sshd/common/channel/StreamingChannel.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.channel;
+
+/**
+ * A channel that can be either configured to use synchronous or asynchrounous
streams.
+ *
+ * @author <a href="mailto:[email protected]">Apache MINA SSHD
Project</a>
+ */
+public interface StreamingChannel {
+
+ enum Streaming {
+ Async,
+ Sync
+ }
+
+ Streaming getStreaming();
+
+ void setStreaming(Streaming streaming);
+
+}
diff --git
a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
index 84d0468..2200ba2 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
@@ -70,6 +70,9 @@ public class Nio2Session extends AbstractCloseable implements
IoSession {
private final AtomicLong lastReadCycleStart = new AtomicLong();
private final AtomicLong writeCyclesCounter = new AtomicLong();
private final AtomicLong lastWriteCycleStart = new AtomicLong();
+ private final Object suspendLock = new Object();
+ private volatile boolean suspend;
+ private volatile Runnable readRunnable;
public Nio2Session(
Nio2Service service, FactoryManager manager, IoHandler
handler, AsynchronousSocketChannel socket,
@@ -382,7 +385,43 @@ public class Nio2Session extends AbstractCloseable
implements IoSession {
exceptionCaught(exc);
}
+ @Override
+ public void suspendRead() {
+ log.trace("suspendRead({})", this);
+ boolean prev = suspend;
+ suspend = true;
+ if (!prev) {
+ log.debug("suspendRead({}) requesting read suspension", this);
+ }
+ }
+
+ @Override
+ public void resumeRead() {
+ log.trace("resumeRead({})", this);
+ if (suspend) {
+ Runnable runnable;
+ synchronized (suspendLock) {
+ suspend = false;
+ runnable = readRunnable;
+ }
+ if (runnable != null) {
+ log.debug("resumeRead({}) resuming read", this);
+ runnable.run();
+ }
+ }
+ }
+
protected void doReadCycle(ByteBuffer buffer,
Nio2CompletionHandler<Integer, Object> completion) {
+ if (suspend) {
+ log.debug("doReadCycle({}) suspending reading", this);
+ synchronized (suspendLock) {
+ if (suspend) {
+ readRunnable = () -> doReadCycle(buffer, completion);
+ return;
+ }
+ }
+ }
+
AsynchronousSocketChannel socket = getSocket();
Duration readTimeout =
CoreModuleProperties.NIO2_READ_TIMEOUT.getRequired(manager);
readCyclesCounter.incrementAndGet();
diff --git
a/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java
b/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java
index 828bf72..d728c3e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java
+++ b/sshd-core/src/main/java/org/apache/sshd/core/CoreModuleProperties.java
@@ -678,6 +678,23 @@ public final class CoreModuleProperties {
public static final Property<String> X11_BIND_HOST
= Property.string("x11-fwd-bind-host",
SshdSocketAddress.LOCALHOST_IPV4);
+ /**
+ * Configuration value for the {@link
org.apache.sshd.server.forward.TcpipServerChannel} to control the higher
+ * theshold for the data to be buffered waiting to be sent. If the
buffered data size reaches this value, the
+ * session will pause reading until the data length goes below the
+ * {@link #TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_LOW} threshold.
+ */
+ public static final Property<Long>
TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_HIGH
+ =
Property.long_("tcpip-server-channel-buffer-size-threshold-high", 1024 * 1024);
+
+ /**
+ * The lower threshold. If not set, half the higher threshold will be used.
+ *
+ * @see #TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_HIGH
+ */
+ public static final Property<Long>
TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_LOW
+ = Property.long_("tcpip-server-channel-buffer-size-threshold-low");
+
private CoreModuleProperties() {
throw new UnsupportedOperationException("No instance");
}
diff --git
a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
index a64eaf3..e14775a 100644
---
a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++
b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -23,6 +23,7 @@ import java.net.ConnectException;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.sshd.client.future.DefaultOpenFuture;
import org.apache.sshd.client.future.OpenFuture;
@@ -34,17 +35,22 @@ import
org.apache.sshd.common.channel.BufferedIoOutputStream;
import org.apache.sshd.common.channel.Channel;
import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
import org.apache.sshd.common.channel.ChannelFactory;
+import org.apache.sshd.common.channel.ChannelOutputStream;
+import org.apache.sshd.common.channel.SimpleIoOutputStream;
+import org.apache.sshd.common.channel.StreamingChannel;
import org.apache.sshd.common.channel.Window;
import org.apache.sshd.common.channel.exception.SshChannelOpenException;
import org.apache.sshd.common.forward.Forwarder;
import org.apache.sshd.common.forward.ForwardingTunnelEndpointsProvider;
import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.io.IoConnectFuture;
import org.apache.sshd.common.io.IoConnector;
import org.apache.sshd.common.io.IoHandler;
import org.apache.sshd.common.io.IoOutputStream;
import org.apache.sshd.common.io.IoServiceFactory;
import org.apache.sshd.common.io.IoSession;
+import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.Readable;
@@ -56,6 +62,7 @@ import org.apache.sshd.common.util.net.SshdSocketAddress;
import org.apache.sshd.common.util.threads.CloseableExecutorService;
import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
import org.apache.sshd.common.util.threads.ThreadUtils;
+import org.apache.sshd.core.CoreModuleProperties;
import org.apache.sshd.server.channel.AbstractServerChannel;
import org.apache.sshd.server.forward.TcpForwardingFilter.Type;
@@ -64,7 +71,7 @@ import
org.apache.sshd.server.forward.TcpForwardingFilter.Type;
*
* @author <a href="mailto:[email protected]">Apache MINA SSHD Project</a>
*/
-public class TcpipServerChannel extends AbstractServerChannel implements
ForwardingTunnelEndpointsProvider {
+public class TcpipServerChannel extends AbstractServerChannel implements
StreamingChannel, ForwardingTunnelEndpointsProvider {
public abstract static class TcpipFactory implements ChannelFactory,
ExecutorServiceCarrier {
@@ -102,6 +109,8 @@ public class TcpipServerChannel extends
AbstractServerChannel implements Forward
private SshdSocketAddress tunnelExit;
private SshdSocketAddress originatorAddress;
private SocketAddress localAddress;
+ private final AtomicLong inFlightDataSize = new AtomicLong();
+ private Streaming streaming = Streaming.Sync;
public TcpipServerChannel(ForwardingFilter.Type type,
CloseableExecutorService executor) {
super("", Collections.emptyList(), executor);
@@ -121,6 +130,16 @@ public class TcpipServerChannel extends
AbstractServerChannel implements Forward
}
@Override
+ public Streaming getStreaming() {
+ return streaming;
+ }
+
+ @Override
+ public void setStreaming(Streaming streaming) {
+ this.streaming = streaming;
+ }
+
+ @Override
public SshdSocketAddress getTunnelEntrance() {
return tunnelEntrance;
}
@@ -195,19 +214,27 @@ public class TcpipServerChannel extends
AbstractServerChannel implements Forward
throw new RuntimeSshException(e);
}
- out = new BufferedIoOutputStream(
- "tcpip channel", new ChannelAsyncOutputStream(this,
SshConstants.SSH_MSG_CHANNEL_DATA) {
- @SuppressWarnings("synthetic-access")
- @Override
- protected CloseFuture doCloseGracefully() {
- try {
- sendEof();
- } catch (IOException e) {
- session.exceptionCaught(e);
- }
- return super.doCloseGracefully();
+ if (streaming == Streaming.Async) {
+ out = new BufferedIoOutputStream(
+ "tcpip channel", new ChannelAsyncOutputStream(this,
SshConstants.SSH_MSG_CHANNEL_DATA) {
+ @SuppressWarnings("synthetic-access")
+ @Override
+ protected CloseFuture doCloseGracefully() {
+ try {
+ sendEof();
+ } catch (IOException e) {
+ session.exceptionCaught(e);
}
- });
+ return super.doCloseGracefully();
+ }
+ });
+ } else {
+ this.out = new SimpleIoOutputStream(new ChannelOutputStream(
+ this, getRemoteWindow(), log,
SshConstants.SSH_MSG_CHANNEL_DATA, true));
+
+ }
+ long thresholdHigh =
CoreModuleProperties.TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_HIGH.getRequired(this);
+ long thresholdLow =
CoreModuleProperties.TCPIP_SERVER_CHANNEL_BUFFER_SIZE_THRESHOLD_LOW.get(this).orElse(thresholdHigh
/ 2);
IoHandler handler = new IoHandler() {
@Override
@SuppressWarnings("synthetic-access")
@@ -217,9 +244,23 @@ public class TcpipServerChannel extends
AbstractServerChannel implements Forward
log.debug("doInit({}) Ignoring write to channel in
CLOSING state", TcpipServerChannel.this);
}
} else {
- Buffer buffer = new ByteArrayBuffer(message.available(),
false);
+ int length = message.available();
+ Buffer buffer = new ByteArrayBuffer(length, false);
buffer.putBuffer(message);
- out.writePacket(buffer);
+ long total = inFlightDataSize.addAndGet(length);
+ if (total > thresholdHigh) {
+ session.suspendRead();
+ }
+ IoWriteFuture ioWriteFuture = out.writePacket(buffer);
+ ioWriteFuture.addListener(new
SshFutureListener<IoWriteFuture>() {
+ @Override
+ public void operationComplete(IoWriteFuture future) {
+ long total = inFlightDataSize.addAndGet(-length);
+ if (total <= thresholdLow) {
+ session.resumeRead();
+ }
+ }
+ });
}
}
diff --git a/sshd-core/src/test/java/org/apache/sshd/LoadTest.java
b/sshd-core/src/test/java/org/apache/sshd/LoadTest.java
index 4948fe0..7afae32 100644
--- a/sshd-core/src/test/java/org/apache/sshd/LoadTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/LoadTest.java
@@ -127,8 +127,6 @@ public class LoadTest extends BaseTestSupport {
try (SshClient client = setupTestFullSupportClient()) {
CoreModuleProperties.MAX_PACKET_SIZE.set(client, 1024L * 16);
CoreModuleProperties.WINDOW_SIZE.set(client, 1024L * 8);
-
client.setKeyExchangeFactories(Collections.singletonList(ClientBuilder.DH2KEX.apply(BuiltinDHFactories.dhg1)));
-
client.setCipherFactories(Collections.singletonList(BuiltinCiphers.blowfishcbc));
client.start();
try (ClientSession session
= client.connect(getCurrentTestName(), TEST_LOCALHOST,
port).verify(CONNECT_TIMEOUT).getSession()) {