This is an automated email from the ASF dual-hosted git repository. twolf pushed a commit to branch dev_3.0 in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
commit 812c2e838d7b1af21af8dc1051e5516f72dedf36 Author: Thomas Wolf <tw...@apache.org> AuthorDate: Sun Apr 6 14:19:06 2025 +0200 Move SSH_MSG_GLOBAL_REQUEST handling to the ConnectionService Now that all the SSH transport stuff has gone from AbstractSession, it's evident that the session contains something it shouldn't: global requests are a feature of the connection service. So move the whole implementation into the AbstractConnectionService. Keep the convenience request() methods on the session, but just forward to the service, if the current service _is_ a ConnectionService. --- .../sshd/common/session/ConnectionService.java | 79 ++++++ .../common/session/filters/SshTransportFilter.java | 24 +- .../sshd/common/session/filters/kex/KexFilter.java | 6 +- .../session/helpers/AbstractConnectionService.java | 294 +++++++++++++++++++- .../common/session/helpers/AbstractSession.java | 295 ++------------------- 5 files changed, 405 insertions(+), 293 deletions(-) diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java index 4340da3d0..67bb13f3f 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java @@ -19,6 +19,9 @@ package org.apache.sshd.common.session; import java.io.IOException; +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.TimeUnit; import org.apache.sshd.agent.common.AgentForwardSupport; import org.apache.sshd.common.Service; @@ -26,6 +29,10 @@ import org.apache.sshd.common.channel.Channel; import org.apache.sshd.common.forward.Forwarder; import org.apache.sshd.common.forward.PortForwardingEventListenerManager; import org.apache.sshd.common.forward.PortForwardingEventListenerManagerHolder; +import org.apache.sshd.common.future.GlobalRequestFuture; +import org.apache.sshd.common.future.GlobalRequestFuture.ReplyHandler; +import org.apache.sshd.common.util.ValidateUtils; +import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.server.x11.X11ForwardSupport; /** @@ -63,6 +70,78 @@ public interface ConnectionService */ Forwarder getForwarder(); + /** + * Send a global request and wait for the response, if the request is sent with {@code want-reply = true}. + * + * @param request the request name - used mainly for logging and debugging + * @param buffer the buffer containing the global request + * @param timeout The number of time units to wait - must be <U>positive</U> + * @param unit The {@link TimeUnit} to wait for the response + * @return the return buffer if the request was successful, {@code null} otherwise. + * @throws IOException if an error occurred when encoding or sending the packet + * @throws java.net.SocketTimeoutException If no response received within specified timeout + */ + default Buffer request(String request, Buffer buffer, long timeout, TimeUnit unit) throws IOException { + ValidateUtils.checkTrue(timeout > 0L, "Non-positive timeout requested: %d", timeout); + return request(request, buffer, TimeUnit.MILLISECONDS.convert(timeout, unit)); + } + + /** + * + * Send a global request and wait for the response, if the request is sent with {@code want-reply = true}. + * + * @param request the request name - used mainly for logging and debugging + * @param buffer the buffer containing the global request + * @param timeout The (never {@code null}) timeout to wait - its milliseconds value is used + * @return the return buffer if the request was successful, {@code null} otherwise. + * @throws IOException if an error occurred when encoding or sending the packet + * @throws java.net.SocketTimeoutException If no response received within specified timeout + */ + default Buffer request(String request, Buffer buffer, Duration timeout) throws IOException { + Objects.requireNonNull(timeout, "No timeout specified"); + return request(request, buffer, timeout.toMillis()); + } + + /** + * Send a global request and wait for the response, if the request is sent with {@code want-reply = true}. + * + * @param request the request name - used mainly for logging and debugging + * @param buffer the buffer containing the global request + * @param maxWaitMillis maximum time in milliseconds to wait for the request to finish - must be + * <U>positive</U> + * @return the return buffer if the request was successful, {@code null} otherwise. + * @throws IOException if an error occurred when encoding or sending the packet + * @throws java.net.SocketTimeoutException If no response received within specified timeout + */ + Buffer request(String request, Buffer buffer, long maxWaitMillis) throws IOException; + + /** + * Send a global request and handle the reply asynchronously. If {@code want-reply = true}, pass the received + * {@link Buffer} to the given {@link ReplyHandler}, which may execute in a different thread. + * + * <dl> + * <dt>want-reply == true && replyHandler != null</dt> + * <dd>The returned future is fulfilled with {@code null} when the request was sent, or with an exception if the + * request could not be sent. The {@code replyHandler} is invoked once the reply is received, with the SSH reply + * code and the data received.</dd> + * <dt>want-reply == true && replyHandler == null</dt> + * <dd>The returned future is fulfilled with an exception if the request could not be sent, or a failure reply was + * received. If a success reply was received, the future is fulfilled with the received data buffer.</dd> + * <dt>want-reply == false</dt> + * <dd>The returned future is fulfilled with an empty {@link Buffer} when the request was sent, or with an exception + * if the request could not be sent. If a reply handler is given, it is invoked with that empty buffer. The handler + * is not invoked if sending the request failed.</dd> + * </dl> + * + * @param buffer the {@link Buffer} containing the global request, with the {@code want-reply} flag set as + * appropriate + * @param request the request name + * @param replyHandler {@link ReplyHandler} for handling the reply; may be {@code null} + * @return Created {@link GlobalRequestFuture} + * @throws IOException if an error occurred while encoding or sending the packet + */ + GlobalRequestFuture request(Buffer buffer, String request, ReplyHandler replyHandler) throws IOException; + // TODO: remove from interface, it's server side only AgentForwardSupport getAgentForwardSupport(); diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/SshTransportFilter.java b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/SshTransportFilter.java index b6c742298..2223b29a8 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/SshTransportFilter.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/SshTransportFilter.java @@ -55,16 +55,15 @@ public class SshTransportFilter extends IoFilter { /** * Creates a new SSH transport filter. * - * @param session {@link AbstractSession} this filter is for - * @param random {@link Random} instance to use - * @param identities {@link SshIdentHandler} for handling the SSH identificaton string - * @param events {@link SessionListener} to report some events - * @param cryptListener {@link EncryptionListener} called just before a buffer is encrypted - * @param proposer {@link Proposer} to get KEX proposals - * @param checker {@link HostKeyChecker} to check the peer's host key; may be {@code null} if on a server + * @param session {@link AbstractSession} this filter is for + * @param random {@link Random} instance to use + * @param identities {@link SshIdentHandler} for handling the SSH identificaton string + * @param events {@link SessionListener} to report some events + * @param proposer {@link Proposer} to get KEX proposals + * @param checker {@link HostKeyChecker} to check the peer's host key; may be {@code null} if on a server */ public SshTransportFilter(AbstractSession session, Random random, SshIdentHandler identities, SessionListener events, - EncryptionListener cryptListener, Proposer proposer, HostKeyChecker checker) { + Proposer proposer, HostKeyChecker checker) { IdentFilter ident = new IdentFilter(); ident.setPropertyResolver(session); ident.setIdentHandler(identities); @@ -73,7 +72,6 @@ public class SshTransportFilter extends IoFilter { cryptFilter = new CryptFilter(); cryptFilter.setSession(session); cryptFilter.setRandom(random); - cryptFilter.addEncryptionListener(cryptListener); filters.addLast(cryptFilter); compressionFilter = new CompressionFilter(); @@ -159,6 +157,14 @@ public class SshTransportFilter extends IoFilter { kexFilter.removeKexListener(listener); } + public void addEncryptionListener(EncryptionListener listener) { + cryptFilter.addEncryptionListener(listener); + } + + public void removeEncryptionListener(EncryptionListener listener) { + cryptFilter.removeEncryptionListener(listener); + } + public boolean isSecure() { return cryptFilter.isSecure(); } diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/kex/KexFilter.java b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/kex/KexFilter.java index 09a19719e..d72fe506b 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/kex/KexFilter.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/kex/KexFilter.java @@ -1198,16 +1198,16 @@ public class KexFilter extends IoFilter { private boolean isWantReply(Buffer message, boolean isChannelRequest) { boolean wantReply = false; - int pos = message.rpos(); + int mark = message.rpos(); message.getUByte(); if (isChannelRequest) { message.getUInt(); // Skip the channel id } long length = message.getUInt(); if (length < message.available()) { - wantReply = message.rawByte(pos + 5 + (int) length) != 0; + wantReply = message.rawByte(message.rpos() + (int) length) != 0; } - message.rpos(pos); + message.rpos(mark); return wantReply; } diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java index 9251d651d..2580e5e35 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java @@ -19,13 +19,17 @@ package org.apache.sshd.common.session.helpers; import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.SocketTimeoutException; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Deque; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -34,6 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.IntUnaryOperator; +import java.util.function.LongConsumer; import org.apache.sshd.agent.common.AgentForwardSupport; import org.apache.sshd.agent.common.DefaultAgentForwardSupport; @@ -42,6 +47,7 @@ import org.apache.sshd.client.future.OpenFuture; import org.apache.sshd.common.Closeable; import org.apache.sshd.common.FactoryManager; import org.apache.sshd.common.SshConstants; +import org.apache.sshd.common.SshException; import org.apache.sshd.common.channel.Channel; import org.apache.sshd.common.channel.ChannelFactory; import org.apache.sshd.common.channel.ChannelListener; @@ -53,7 +59,9 @@ import org.apache.sshd.common.forward.Forwarder; import org.apache.sshd.common.forward.ForwarderFactory; import org.apache.sshd.common.forward.PortForwardingEventListener; import org.apache.sshd.common.forward.PortForwardingEventListenerManager; +import org.apache.sshd.common.future.GlobalRequestFuture; import org.apache.sshd.common.future.HasException; +import org.apache.sshd.common.global.GlobalRequestException; import org.apache.sshd.common.io.AbstractIoWriteFuture; import org.apache.sshd.common.io.IoWriteFuture; import org.apache.sshd.common.kex.KexState; @@ -61,10 +69,12 @@ import org.apache.sshd.common.session.ConnectionService; import org.apache.sshd.common.session.ReservedSessionMessagesHandler; import org.apache.sshd.common.session.Session; import org.apache.sshd.common.session.UnknownChannelReferenceHandler; +import org.apache.sshd.common.session.filters.CryptFilter.EncryptionListener; import org.apache.sshd.common.util.EventListenerUtils; import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.ValidateUtils; import org.apache.sshd.common.util.buffer.Buffer; +import org.apache.sshd.common.util.buffer.ByteArrayBuffer; import org.apache.sshd.common.util.closeable.AbstractInnerCloseable; import org.apache.sshd.common.util.functors.Int2IntFunction; import org.apache.sshd.core.CoreModuleProperties; @@ -78,7 +88,7 @@ import org.apache.sshd.server.x11.X11ForwardSupport; */ public abstract class AbstractConnectionService extends AbstractInnerCloseable - implements ConnectionService { + implements ConnectionService, ReservedSessionMessagesHandler { /** * Default growth factor function used to resize response buffers @@ -110,6 +120,45 @@ public abstract class AbstractConnectionService private final AbstractSession sessionInstance; private UnknownChannelReferenceHandler unknownChannelReferenceHandler; + /** + * Used to wait for results of global requests sent with {@code want-reply = true}. Note that per RFC 4254, global + * requests may be sent at any time, but success/failure replies MUST come in the order the requests were sent. Some + * implementations may also reply with SSH_MSG_UNIMPLEMENTED, on which RFC 4253 says they must be sent in the order + * the message was received. + * <p> + * This implies that it is legal to send "nested" global requests: a client or server may send two (or more) global + * requests, and then receives two (or more) replies in the correct order: first reply for the first request sent; + * second reply for the second request sent. + * </p> + * <p> + * We keep a FIFO list of pending global requests for which we expect a reply. We always add new global requests at + * the head. For success and failure replies, which don't identify the message sequence number of the global + * request, we apply the reply to the tail of the list. For unimplemented messages, we apply it to the request + * identified by the message sequence number, which normally also should be the tail. + * </p> + * <p> + * When a reply is received, the corresponding global request is removed from the list. + * </p> + * <p> + * Global requests sent with {@code want-reply = false} are never added to this list; they are fire-and-forget. + * According to the SSH RFCs, the peer MUST not reply on a message with {@code want-reply = false}. If it does so + * all the same, it is broken. We might then apply the result to the wrong pending global request if we have any. + * </p> + * + * @see <a href="https://tools.ietf.org/html/rfc4254#section-4">RFC 4254: Global Requests</a> + * @see <a href="https://tools.ietf.org/html/rfc4253#section-11.4">RFC 4254: Reserved Messages</a> + * @see #request(Buffer, String, org.apache.sshd.common.future.GlobalRequestFuture.ReplyHandler) + * @see #requestSuccess(Buffer) + * @see #requestFailure(Buffer) + * @see #doInvokeUnimplementedMessageHandler(int, Buffer) + * @see #preClose() + */ + private final Deque<GlobalRequestFuture> pendingGlobalRequests = new ConcurrentLinkedDeque<>(); + + private final Map<Buffer, LongConsumer> globalSequenceNumbers = new ConcurrentHashMap<>(); + + private EncryptionListener sequenceListener; + protected AbstractConnectionService(AbstractSession session) { sessionInstance = Objects.requireNonNull(session, "No session"); listenerProxy = EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, listeners); @@ -180,6 +229,14 @@ public abstract class AbstractConnectionService @Override public void start() { heartBeat = startHeartBeat(); + sequenceListener = (buffer, sequenceNumber) -> { + // SSHD-968 - remember global request outgoing sequence number + LongConsumer setter = globalSequenceNumbers.remove(buffer); + if (setter != null) { + setter.accept(sequenceNumber); + } + }; + sessionInstance.getTransport().addEncryptionListener(sequenceListener); } protected synchronized ScheduledFuture<?> startHeartBeat() { @@ -318,8 +375,201 @@ public abstract class AbstractConnectionService return forwarder; } + private boolean wantReply(Buffer buffer) { + // Examine the buffer to get the want-reply flag + int rpos = buffer.rpos(); + buffer.getByte(); // Skip command + buffer.getString(); // Skip request name + boolean replyFlag = buffer.getBoolean(); + buffer.rpos(rpos); // reset buffer + return replyFlag; + } + + @Override + public Buffer request(String request, Buffer buffer, long maxWaitMillis) throws IOException { + ValidateUtils.checkTrue(maxWaitMillis > 0, + "Requested timeout for " + request + " is not strictly greater than zero: " + maxWaitMillis); + boolean debugEnabled = log.isDebugEnabled(); + boolean withReply = wantReply(buffer); + GlobalRequestFuture future = request(buffer, request, null); + Object result; + boolean done = false; + try { + if (debugEnabled) { + log.debug("request({}) request={}, timeout={}ms", this, request, maxWaitMillis); + } + done = future.await(maxWaitMillis); + result = future.getValue(); + } catch (InterruptedIOException e) { + throw (InterruptedIOException) new InterruptedIOException( + "Interrupted while waiting for request=" + request + " result").initCause(e); + } + + if (!isOpen()) { + throw new IOException("Session was closed or closing while awaiting reply for request=" + request); + } + + if (withReply) { + if (debugEnabled) { + log.debug("request({}) request={}, timeout={}ms, requestSeqNo={}, done {}, result received={}", this, request, + maxWaitMillis, future.getSequenceNumber(), done, result instanceof Buffer); + } + + if (!done || result == null) { + throw new SocketTimeoutException("No response received after " + maxWaitMillis + "ms for request=" + request); + } + // The operation is specified to return null if the request could be made, but got an error reply. + // The caller cannot distinguish between SSH_MSG_UNIMPLEMENTED and SSH_MSG_REQUEST_FAILURE. + if (result instanceof GlobalRequestException) { + if (debugEnabled) { + log.debug("request({}) request={}, requestSeqNo={}: received={}", this, request, future.getSequenceNumber(), + SshConstants.getCommandMessageName(((GlobalRequestException) result).getCode())); + } + return null; + } + } + + if (result instanceof Throwable) { + throw new IOException("Exception on request " + request, (Throwable) result); + } + if (result instanceof Buffer) { + return (Buffer) result; + } + return null; + } + + @Override + public GlobalRequestFuture request(Buffer buffer, String request, GlobalRequestFuture.ReplyHandler replyHandler) + throws IOException { + GlobalRequestFuture globalRequest; + if (!wantReply(buffer)) { + if (!isOpen()) { + throw new IOException("Global request " + request + ": session is closing or closed."); + } + // Fire-and-forget global requests (want-reply = false) are always allowed; we don't need to register the + // future, nor do we have to wait for anything. Client code can wait on the returned future if it wants to + // be sure the message has been sent. + globalRequest = new GlobalRequestFuture(request, replyHandler) { + + @Override + public void operationComplete(IoWriteFuture future) { + if (future.isWritten()) { + if (log.isDebugEnabled()) { + log.debug("makeGlobalRequest({})[{}] want-reply=false sent", this, getId()); + } + setValue(new ByteArrayBuffer(new byte[0])); + GlobalRequestFuture.ReplyHandler handler = getHandler(); + if (handler != null) { + handler.accept(SshConstants.SSH_MSG_REQUEST_SUCCESS, getBuffer()); + } + } + super.operationComplete(future); + } + }; + sessionInstance.writePacket(buffer).addListener(globalRequest); + return globalRequest; + } + // We do expect a reply. The packet may get queued or otherwise delayed for an unknown time. We must + // consider this request pending only once its sequence number is known. If sending the message fails, + // the writeFuture will set an exception on the globalRequest, or will fail it. + globalRequest = new GlobalRequestFuture(request, replyHandler) { + + @Override + public void operationComplete(IoWriteFuture future) { + if (!future.isWritten()) { + // If it was not written after all, make sure it's not considered pending anymore. + pendingGlobalRequests.removeFirstOccurrence(this); + } + // Super call will fulfill the future if not written + super.operationComplete(future); + if (future.isWritten() && getHandler() != null) { + // Fulfill this future now. The GlobalRequestFuture can thus be used to wait for the + // successful sending of the request, the framework will invoke the handler whenever + // the reply arrives. The buffer cannot be obtained though the future. + setValue(null); + } + } + }; + if (!isOpen()) { + throw new IOException("Global request " + request + ": session is closing or closed."); + } + // This consumer will be invoked once before the packet actually goes out. Some servers respond to global + // requests with SSH_MSG_UNIMPLEMENTED instead of SSH_MSG_REQUEST_FAILURE (see SSHD-968), so we need to make + // sure we do know the sequence number. + globalSequenceNumbers.put(buffer, seqNo -> { + globalRequest.setSequenceNumber(seqNo); + if (log.isDebugEnabled()) { + log.debug("makeGlobalRequest({})[{}] want-reply=true with seqNo={}", this, globalRequest.getId(), seqNo); + } + // Insert at front + pendingGlobalRequests.push(globalRequest); + }); + sessionInstance.writePacket(buffer).addListener(f -> { + Throwable t = f.getException(); + if (t != null) { + // Just in case we get an exception before preProcessEncodeBuffer was even called + globalSequenceNumbers.remove(buffer); + } + }).addListener(globalRequest); // Report errors through globalRequest, fulfilling globalRequest + return globalRequest; + } + + @Override + public boolean handleUnimplementedMessage(Session session, int cmd, Buffer buffer) throws Exception { + /* + * SSHD-968 Some servers respond to global requests with SSH_MSG_UNIMPLEMENTED instead of + * SSH_MSG_REQUEST_FAILURE (as mandated by https://tools.ietf.org/html/rfc4254#section-4) so deal with it. + */ + if (!pendingGlobalRequests.isEmpty()) { + // We do have ongoing global requests. + long msgSeqNo = buffer.rawUInt(buffer.rpos()); + + // Find the global request this applies to + GlobalRequestFuture future = pendingGlobalRequests.stream().filter(f -> f.getSequenceNumber() == msgSeqNo).findAny() + .orElse(null); + if (future != null && pendingGlobalRequests.removeFirstOccurrence(future)) { + // This SSH_MSG_UNIMPLEMENTED was the reply to a global request. + if (log.isDebugEnabled()) { + log.debug("doInvokeUnimplementedMessageHandler({}) report global request={} failure for seqNo={}", this, + future.getId(), msgSeqNo); + } + GlobalRequestFuture.ReplyHandler handler = future.getHandler(); + if (handler != null) { + Buffer resultBuf = ByteArrayBuffer.getCompactClone(buffer.array(), buffer.rpos(), buffer.available()); + handler.accept(cmd, resultBuf); + } else { + future.setValue(new GlobalRequestException(cmd)); + } + return true; // message handled internally + } else if (future != null) { + // The SSH_MSG_UNIMPLEMENTED was for a global request, but that request is no longer in the list: it + // got terminated otherwise. + return true; + } + if (log.isTraceEnabled()) { + log.trace( + "doInvokeUnimplementedMessageHandler({}) SSH_MSG_UNIMPLEMENTED with message seqNo={} not for a global request", + this, msgSeqNo); + } + } + return false; + } + @Override protected void preClose() { + // If anyone is waiting for a global response notify them about the closing session + boolean debugEnabled = log.isDebugEnabled(); + for (;;) { + GlobalRequestFuture future = pendingGlobalRequests.pollLast(); + if (future == null) { + break; + } + if (debugEnabled) { + log.debug("preClose({}): Session closing; failing still pending global request {}", this, future.getId()); + } + future.setValue(new SshException("Session is closing")); + } + stopHeartBeat(); this.listeners.clear(); this.managersHolder.clear(); @@ -390,6 +640,7 @@ public abstract class AbstractConnectionService return builder() .sequential(forwarderHolder.get(), agentForwardHolder.get(), x11ForwardHolder.get()) .parallel(toString(), getChannels()) + .run(this, () -> sessionInstance.getTransport().removeEncryptionListener(sequenceListener)) .build(); } @@ -912,14 +1163,47 @@ public abstract class AbstractConnectionService return session.writePacket(rsp); } + /** + * Indicates the reception of a {@code SSH_MSG_REQUEST_SUCCESS} message + * + * @param buffer The {@link Buffer} containing the message data + * @throws Exception If failed to handle the message + */ protected void requestSuccess(Buffer buffer) throws Exception { - AbstractSession s = getSession(); - s.requestSuccess(buffer); + sessionInstance.resetIdleTimeout(); + // Remove at end + GlobalRequestFuture request = pendingGlobalRequests.pollLast(); + if (request != null) { + // use a copy of the original data in case it is re-used on return + Buffer resultBuf = ByteArrayBuffer.getCompactClone(buffer.array(), buffer.rpos(), buffer.available()); + GlobalRequestFuture.ReplyHandler handler = request.getHandler(); + if (handler != null) { + handler.accept(SshConstants.SSH_MSG_REQUEST_SUCCESS, resultBuf); + } else { + request.setValue(resultBuf); + } + } } + /** + * Indicates the reception of a {@code SSH_MSG_REQUEST_FAILURE} message + * + * @param buffer The {@link Buffer} containing the message data + * @throws Exception If failed to handle the message + */ protected void requestFailure(Buffer buffer) throws Exception { - AbstractSession s = getSession(); - s.requestFailure(buffer); + sessionInstance.resetIdleTimeout(); + // Remove at end + GlobalRequestFuture request = pendingGlobalRequests.pollLast(); + if (request != null) { + GlobalRequestFuture.ReplyHandler handler = request.getHandler(); + if (handler != null) { + Buffer resultBuf = ByteArrayBuffer.getCompactClone(buffer.array(), buffer.rpos(), buffer.available()); + handler.accept(SshConstants.SSH_MSG_REQUEST_FAILURE, resultBuf); + } else { + request.setValue(new GlobalRequestException(SshConstants.SSH_MSG_REQUEST_FAILURE)); + } + } } @Override diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java index 1213458c1..2065b8006 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java @@ -21,24 +21,19 @@ package org.apache.sshd.common.session.helpers; import java.io.IOException; import java.io.InterruptedIOException; import java.net.ProtocolException; -import java.net.SocketTimeoutException; import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Deque; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.LongConsumer; import org.apache.sshd.common.Closeable; import org.apache.sshd.common.Factory; @@ -60,7 +55,6 @@ import org.apache.sshd.common.forward.PortForwardingEventListener; import org.apache.sshd.common.future.DefaultSshFuture; import org.apache.sshd.common.future.GlobalRequestFuture; import org.apache.sshd.common.future.KeyExchangeFuture; -import org.apache.sshd.common.global.GlobalRequestException; import org.apache.sshd.common.io.IoSession; import org.apache.sshd.common.io.IoWriteFuture; import org.apache.sshd.common.kex.KexProposalOption; @@ -70,11 +64,11 @@ import org.apache.sshd.common.kex.extension.KexExtensionHandler.AvailabilityPhas import org.apache.sshd.common.kex.extension.KexExtensions; import org.apache.sshd.common.mac.MacInformation; import org.apache.sshd.common.random.Random; +import org.apache.sshd.common.session.ConnectionService; import org.apache.sshd.common.session.ReservedSessionMessagesHandler; import org.apache.sshd.common.session.Session; import org.apache.sshd.common.session.SessionListener; import org.apache.sshd.common.session.filters.CryptFilter; -import org.apache.sshd.common.session.filters.CryptFilter.EncryptionListener; import org.apache.sshd.common.session.filters.SshIdentHandler; import org.apache.sshd.common.session.filters.SshTransportFilter; import org.apache.sshd.common.session.filters.kex.KexListener; @@ -83,7 +77,6 @@ import org.apache.sshd.common.util.ExceptionUtils; import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.ValidateUtils; import org.apache.sshd.common.util.buffer.Buffer; -import org.apache.sshd.common.util.buffer.ByteArrayBuffer; import org.apache.sshd.core.CoreModuleProperties; /** @@ -135,43 +128,6 @@ public abstract class AbstractSession extends SessionHelper { protected String serverVersion; protected String clientVersion; - /** - * Used to wait for results of global requests sent with {@code want-reply = true}. Note that per RFC 4254, global - * requests may be sent at any time, but success/failure replies MUST come in the order the requests were sent. Some - * implementations may also reply with SSH_MSG_UNIMPLEMENTED, on which RFC 4253 says they must be sent in the order - * the message was received. - * <p> - * This implies that it is legal to send "nested" global requests: a client or server may send two (or more) global - * requests, and then receives two (or more) replies in the correct order: first reply for the first request sent; - * second reply for the second request sent. - * </p> - * <p> - * We keep a FIFO list of pending global requests for which we expect a reply. We always add new global requests at - * the head. For success and failure replies, which don't identify the message sequence number of the global - * request, we apply the reply to the tail of the list. For unimplemented messages, we apply it to the request - * identified by the message sequence number, which normally also should be the tail. - * </p> - * <p> - * When a reply is received, the corresponding global request is removed from the list. - * </p> - * <p> - * Global requests sent with {@code want-reply = false} are never added to this list; they are fire-and-forget. - * According to the SSH RFCs, the peer MUST not reply on a message with {@code want-reply = false}. If it does so - * all the same, it is broken. We might then apply the result to the wrong pending global request if we have any. - * </p> - * - * @see <a href="https://tools.ietf.org/html/rfc4254#section-4">RFC 4254: Global Requests</a> - * @see <a href="https://tools.ietf.org/html/rfc4253#section-11.4">RFC 4254: Reserved Messages</a> - * @see #request(Buffer, String, org.apache.sshd.common.future.GlobalRequestFuture.ReplyHandler) - * @see #requestSuccess(Buffer) - * @see #requestFailure(Buffer) - * @see #doInvokeUnimplementedMessageHandler(int, Buffer) - * @see #preClose() - */ - private final Deque<GlobalRequestFuture> pendingGlobalRequests = new ConcurrentLinkedDeque<>(); - - private final Map<Buffer, LongConsumer> globalSequenceNumbers = new ConcurrentHashMap<>(); - private final FilterChain filters = new DefaultFilterChain(); private SshTransportFilter sshTransport; @@ -327,15 +283,7 @@ public abstract class AbstractSession extends SessionHelper { } } }; - EncryptionListener sequenceListener = (buffer, sequenceNumber) -> { - // SSHD-968 - remember global request outgoing sequence number - LongConsumer setter = globalSequenceNumbers.remove(buffer); - if (setter != null) { - setter.accept(sequenceNumber); - } - }; - sshTransport = new SshTransportFilter(this, random, identities, sessionEvents, sequenceListener, - this::getKexProposal, this::checkKeys); + sshTransport = new SshTransportFilter(this, random, identities, sessionEvents, this::getKexProposal, this::checkKeys); filters.addLast(sshTransport); } @@ -584,19 +532,6 @@ public abstract class AbstractSession extends SessionHelper { sshTransport.shutdown(); } - // if anyone waiting for global response notify them about the closing session - boolean debugEnabled = log.isDebugEnabled(); - for (;;) { - GlobalRequestFuture future = pendingGlobalRequests.pollLast(); - if (future == null) { - break; - } - if (debugEnabled) { - log.debug("preClose({}): Session closing; failing still pending global request {}", this, future.getId()); - } - future.setValue(new SshException("Session is closing")); - } - // Fire 'close' event try { signalSessionClosed(); @@ -682,184 +617,27 @@ public abstract class AbstractSession extends SessionHelper { return writeFuture; } - private boolean wantReply(Buffer buffer) { - // Examine the buffer to get the want-reply flag - int rpos = buffer.rpos(); - buffer.getByte(); // Skip command - buffer.getString(); // Skip request name - boolean replyFlag = buffer.getBoolean(); - buffer.rpos(rpos); // reset buffer - return replyFlag; - } - @Override public Buffer request(String request, Buffer buffer, long maxWaitMillis) throws IOException { - ValidateUtils.checkTrue(maxWaitMillis > 0, - "Requested timeout for " + request + " is not strictly greater than zero: " + maxWaitMillis); - boolean debugEnabled = log.isDebugEnabled(); - boolean withReply = wantReply(buffer); - GlobalRequestFuture future = request(buffer, request, null); - Object result; - boolean done = false; - try { - if (debugEnabled) { - log.debug("request({}) request={}, timeout={}ms", this, request, maxWaitMillis); - } - done = future.await(maxWaitMillis); - result = future.getValue(); - } catch (InterruptedIOException e) { - throw (InterruptedIOException) new InterruptedIOException( - "Interrupted while waiting for request=" + request + " result").initCause(e); - } - - if (!isOpen()) { - throw new IOException("Session was closed or closing while awaiting reply for request=" + request); - } - - if (withReply) { - if (debugEnabled) { - log.debug("request({}) request={}, timeout={}ms, requestSeqNo={}, done {}, result received={}", this, request, - maxWaitMillis, future.getSequenceNumber(), done, result instanceof Buffer); - } - - if (!done || result == null) { - throw new SocketTimeoutException("No response received after " + maxWaitMillis + "ms for request=" + request); - } - // The operation is specified to return null if the request could be made, but got an error reply. - // The caller cannot distinguish between SSH_MSG_UNIMPLEMENTED and SSH_MSG_REQUEST_FAILURE. - if (result instanceof GlobalRequestException) { - if (debugEnabled) { - log.debug("request({}) request={}, requestSeqNo={}: received={}", this, request, future.getSequenceNumber(), - SshConstants.getCommandMessageName(((GlobalRequestException) result).getCode())); - } - return null; - } - } - - if (result instanceof Throwable) { - throw new IOException("Exception on request " + request, (Throwable) result); - } - if (result instanceof Buffer) { - return (Buffer) result; - } - return null; + ConnectionService service = getCurrentService(ConnectionService.class); + ValidateUtils.checkNotNull(service, "Current service is not a ConnectionService"); + return service.request(request, buffer, maxWaitMillis); } @Override public GlobalRequestFuture request(Buffer buffer, String request, GlobalRequestFuture.ReplyHandler replyHandler) throws IOException { - GlobalRequestFuture globalRequest; - if (!wantReply(buffer)) { - if (!isOpen()) { - throw new IOException("Global request " + request + ": session is closing or closed."); - } - // Fire-and-forget global requests (want-reply = false) are always allowed; we don't need to register the - // future, nor do we have to wait for anything. Client code can wait on the returned future if it wants to - // be sure the message has been sent. - globalRequest = new GlobalRequestFuture(request, replyHandler) { - - @Override - public void operationComplete(IoWriteFuture future) { - if (future.isWritten()) { - if (log.isDebugEnabled()) { - log.debug("makeGlobalRequest({})[{}] want-reply=false sent", this, getId()); - } - setValue(new ByteArrayBuffer(new byte[0])); - GlobalRequestFuture.ReplyHandler handler = getHandler(); - if (handler != null) { - handler.accept(SshConstants.SSH_MSG_REQUEST_SUCCESS, getBuffer()); - } - } - super.operationComplete(future); - } - }; - writePacket(buffer).addListener(globalRequest); - return globalRequest; - } - // We do expect a reply. The packet may get queued or otherwise delayed for an unknown time. We must - // consider this request pending only once its sequence number is known. If sending the message fails, - // the writeFuture will set an exception on the globalRequest, or will fail it. - globalRequest = new GlobalRequestFuture(request, replyHandler) { - - @Override - public void operationComplete(IoWriteFuture future) { - if (!future.isWritten()) { - // If it was not written after all, make sure it's not considered pending anymore. - pendingGlobalRequests.removeFirstOccurrence(this); - } - // Super call will fulfill the future if not written - super.operationComplete(future); - if (future.isWritten() && getHandler() != null) { - // Fulfill this future now. The GlobalRequestFuture can thus be used to wait for the - // successful sending of the request, the framework will invoke the handler whenever - // the reply arrives. The buffer cannot be obtained though the future. - setValue(null); - } - } - }; - if (!isOpen()) { - throw new IOException("Global request " + request + ": session is closing or closed."); - } - // This consumer will be invoked once before the packet actually goes out. Some servers respond to global - // requests with SSH_MSG_UNIMPLEMENTED instead of SSH_MSG_REQUEST_FAILURE (see SSHD-968), so we need to make - // sure we do know the sequence number. - globalSequenceNumbers.put(buffer, seqNo -> { - globalRequest.setSequenceNumber(seqNo); - if (log.isDebugEnabled()) { - log.debug("makeGlobalRequest({})[{}] want-reply=true with seqNo={}", this, globalRequest.getId(), seqNo); - } - // Insert at front - pendingGlobalRequests.push(globalRequest); - }); - writePacket(buffer).addListener(f -> { - Throwable t = f.getException(); - if (t != null) { - // Just in case we get an exception before preProcessEncodeBuffer was even called - globalSequenceNumbers.remove(buffer); - } - }).addListener(globalRequest); // Report errors through globalRequest, fulfilling globalRequest - return globalRequest; + ConnectionService service = getCurrentService(ConnectionService.class); + ValidateUtils.checkNotNull(service, "Current service is not a ConnectionService"); + return service.request(buffer, request, replyHandler); } @Override protected boolean doInvokeUnimplementedMessageHandler(int cmd, Buffer buffer) throws Exception { - /* - * SSHD-968 Some servers respond to global requests with SSH_MSG_UNIMPLEMENTED instead of - * SSH_MSG_REQUEST_FAILURE (as mandated by https://tools.ietf.org/html/rfc4254#section-4) so deal with it. - */ - if (!pendingGlobalRequests.isEmpty() && cmd == SshConstants.SSH_MSG_UNIMPLEMENTED) { - // We do have ongoing global requests. - long msgSeqNo = buffer.rawUInt(buffer.rpos()); - - // Find the global request this applies to - GlobalRequestFuture future = pendingGlobalRequests.stream().filter(f -> f.getSequenceNumber() == msgSeqNo).findAny() - .orElse(null); - if (future != null && pendingGlobalRequests.removeFirstOccurrence(future)) { - // This SSH_MSG_UNIMPLEMENTED was the reply to a global request. - if (log.isDebugEnabled()) { - log.debug("doInvokeUnimplementedMessageHandler({}) report global request={} failure for seqNo={}", this, - future.getId(), msgSeqNo); - } - GlobalRequestFuture.ReplyHandler handler = future.getHandler(); - if (handler != null) { - Buffer resultBuf = ByteArrayBuffer.getCompactClone(buffer.array(), buffer.rpos(), buffer.available()); - handler.accept(cmd, resultBuf); - } else { - future.setValue(new GlobalRequestException(cmd)); - } - return true; // message handled internally - } else if (future != null) { - // The SSH_MSG_UNIMPLEMENTED was for a global request, but that request is no longer in the list: it - // got terminated otherwise. - return true; - } - if (log.isTraceEnabled()) { - log.trace( - "doInvokeUnimplementedMessageHandler({}) SSH_MSG_UNIMPLEMENTED with message seqNo={} not for a global request", - this, msgSeqNo); - } + ReservedSessionMessagesHandler service = getCurrentService(ReservedSessionMessagesHandler.class); + if (service != null && service.handleUnimplementedMessage(this, cmd, buffer)) { + return true; } - return super.doInvokeUnimplementedMessageHandler(cmd, buffer); } @@ -929,49 +707,6 @@ public abstract class AbstractSession extends SessionHelper { return sendNotImplemented(sshTransport.getLastInputSequenceNumber()); } - /** - * Indicates the reception of a {@code SSH_MSG_REQUEST_SUCCESS} message - * - * @param buffer The {@link Buffer} containing the message data - * @throws Exception If failed to handle the message - */ - protected void requestSuccess(Buffer buffer) throws Exception { - resetIdleTimeout(); - // Remove at end - GlobalRequestFuture request = pendingGlobalRequests.pollLast(); - if (request != null) { - // use a copy of the original data in case it is re-used on return - Buffer resultBuf = ByteArrayBuffer.getCompactClone(buffer.array(), buffer.rpos(), buffer.available()); - GlobalRequestFuture.ReplyHandler handler = request.getHandler(); - if (handler != null) { - handler.accept(SshConstants.SSH_MSG_REQUEST_SUCCESS, resultBuf); - } else { - request.setValue(resultBuf); - } - } - } - - /** - * Indicates the reception of a {@code SSH_MSG_REQUEST_FAILURE} message - * - * @param buffer The {@link Buffer} containing the message data - * @throws Exception If failed to handle the message - */ - protected void requestFailure(Buffer buffer) throws Exception { - resetIdleTimeout(); - // Remove at end - GlobalRequestFuture request = pendingGlobalRequests.pollLast(); - if (request != null) { - GlobalRequestFuture.ReplyHandler handler = request.getHandler(); - if (handler != null) { - Buffer resultBuf = ByteArrayBuffer.getCompactClone(buffer.array(), buffer.rpos(), buffer.available()); - handler.accept(SshConstants.SSH_MSG_REQUEST_FAILURE, resultBuf); - } else { - request.setValue(new GlobalRequestException(SshConstants.SSH_MSG_REQUEST_FAILURE)); - } - } - } - @Override public void addSessionListener(SessionListener listener) { SessionListener.validateListener(listener); @@ -1135,6 +870,14 @@ public abstract class AbstractSession extends SessionHelper { } } + protected <T> T getCurrentService(Class<? extends T> type) { + Service service = currentService.getService(); + if (type.isInstance(service)) { + return type.cast(service); + } + return null; + } + /** * Indicates the the key exchange is completed and the exchanged keys can now be verified - e.g., client can verify * the server's key