This is an automated email from the ASF dual-hosted git repository.

twolf pushed a commit to branch dev_3.0
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git

commit 812c2e838d7b1af21af8dc1051e5516f72dedf36
Author: Thomas Wolf <tw...@apache.org>
AuthorDate: Sun Apr 6 14:19:06 2025 +0200

    Move SSH_MSG_GLOBAL_REQUEST handling to the ConnectionService
    
    Now that all the SSH transport stuff has gone from AbstractSession,
    it's evident that the session contains something it shouldn't: global
    requests are a feature of the connection service. So move the whole
    implementation into the AbstractConnectionService. Keep the convenience
    request() methods on the session, but just forward to the service, if
    the current service _is_ a ConnectionService.
---
 .../sshd/common/session/ConnectionService.java     |  79 ++++++
 .../common/session/filters/SshTransportFilter.java |  24 +-
 .../sshd/common/session/filters/kex/KexFilter.java |   6 +-
 .../session/helpers/AbstractConnectionService.java | 294 +++++++++++++++++++-
 .../common/session/helpers/AbstractSession.java    | 295 ++-------------------
 5 files changed, 405 insertions(+), 293 deletions(-)

diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java 
b/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java
index 4340da3d0..67bb13f3f 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/ConnectionService.java
@@ -19,6 +19,9 @@
 package org.apache.sshd.common.session;
 
 import java.io.IOException;
+import java.time.Duration;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.sshd.agent.common.AgentForwardSupport;
 import org.apache.sshd.common.Service;
@@ -26,6 +29,10 @@ import org.apache.sshd.common.channel.Channel;
 import org.apache.sshd.common.forward.Forwarder;
 import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
 import org.apache.sshd.common.forward.PortForwardingEventListenerManagerHolder;
+import org.apache.sshd.common.future.GlobalRequestFuture;
+import org.apache.sshd.common.future.GlobalRequestFuture.ReplyHandler;
+import org.apache.sshd.common.util.ValidateUtils;
+import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.server.x11.X11ForwardSupport;
 
 /**
@@ -63,6 +70,78 @@ public interface ConnectionService
      */
     Forwarder getForwarder();
 
+    /**
+     * Send a global request and wait for the response, if the request is sent 
with {@code want-reply = true}.
+     *
+     * @param  request                         the request name - used mainly 
for logging and debugging
+     * @param  buffer                          the buffer containing the 
global request
+     * @param  timeout                         The number of time units to 
wait - must be <U>positive</U>
+     * @param  unit                            The {@link TimeUnit} to wait 
for the response
+     * @return                                 the return buffer if the 
request was successful, {@code null} otherwise.
+     * @throws IOException                     if an error occurred when 
encoding or sending the packet
+     * @throws java.net.SocketTimeoutException If no response received within 
specified timeout
+     */
+    default Buffer request(String request, Buffer buffer, long timeout, 
TimeUnit unit) throws IOException {
+        ValidateUtils.checkTrue(timeout > 0L, "Non-positive timeout requested: 
%d", timeout);
+        return request(request, buffer, TimeUnit.MILLISECONDS.convert(timeout, 
unit));
+    }
+
+    /**
+     *
+     * Send a global request and wait for the response, if the request is sent 
with {@code want-reply = true}.
+     *
+     * @param  request                         the request name - used mainly 
for logging and debugging
+     * @param  buffer                          the buffer containing the 
global request
+     * @param  timeout                         The (never {@code null}) 
timeout to wait - its milliseconds value is used
+     * @return                                 the return buffer if the 
request was successful, {@code null} otherwise.
+     * @throws IOException                     if an error occurred when 
encoding or sending the packet
+     * @throws java.net.SocketTimeoutException If no response received within 
specified timeout
+     */
+    default Buffer request(String request, Buffer buffer, Duration timeout) 
throws IOException {
+        Objects.requireNonNull(timeout, "No timeout specified");
+        return request(request, buffer, timeout.toMillis());
+    }
+
+    /**
+     * Send a global request and wait for the response, if the request is sent 
with {@code want-reply = true}.
+     *
+     * @param  request                         the request name - used mainly 
for logging and debugging
+     * @param  buffer                          the buffer containing the 
global request
+     * @param  maxWaitMillis                   maximum time in milliseconds to 
wait for the request to finish - must be
+     *                                         <U>positive</U>
+     * @return                                 the return buffer if the 
request was successful, {@code null} otherwise.
+     * @throws IOException                     if an error occurred when 
encoding or sending the packet
+     * @throws java.net.SocketTimeoutException If no response received within 
specified timeout
+     */
+    Buffer request(String request, Buffer buffer, long maxWaitMillis) throws 
IOException;
+
+    /**
+     * Send a global request and handle the reply asynchronously. If {@code 
want-reply = true}, pass the received
+     * {@link Buffer} to the given {@link ReplyHandler}, which may execute in 
a different thread.
+     *
+     * <dl>
+     * <dt>want-reply == true && replyHandler != null</dt>
+     * <dd>The returned future is fulfilled with {@code null} when the request 
was sent, or with an exception if the
+     * request could not be sent. The {@code replyHandler} is invoked once the 
reply is received, with the SSH reply
+     * code and the data received.</dd>
+     * <dt>want-reply == true && replyHandler == null</dt>
+     * <dd>The returned future is fulfilled with an exception if the request 
could not be sent, or a failure reply was
+     * received. If a success reply was received, the future is fulfilled with 
the received data buffer.</dd>
+     * <dt>want-reply == false</dt>
+     * <dd>The returned future is fulfilled with an empty {@link Buffer} when 
the request was sent, or with an exception
+     * if the request could not be sent. If a reply handler is given, it is 
invoked with that empty buffer. The handler
+     * is not invoked if sending the request failed.</dd>
+     * </dl>
+     *
+     * @param  buffer       the {@link Buffer} containing the global request, 
with the {@code want-reply} flag set as
+     *                      appropriate
+     * @param  request      the request name
+     * @param  replyHandler {@link ReplyHandler} for handling the reply; may 
be {@code null}
+     * @return              Created {@link GlobalRequestFuture}
+     * @throws IOException  if an error occurred while encoding or sending the 
packet
+     */
+    GlobalRequestFuture request(Buffer buffer, String request, ReplyHandler 
replyHandler) throws IOException;
+
     // TODO: remove from interface, it's server side only
     AgentForwardSupport getAgentForwardSupport();
 
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/SshTransportFilter.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/SshTransportFilter.java
index b6c742298..2223b29a8 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/SshTransportFilter.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/SshTransportFilter.java
@@ -55,16 +55,15 @@ public class SshTransportFilter extends IoFilter {
     /**
      * Creates a new SSH transport filter.
      *
-     * @param session       {@link AbstractSession} this filter is for
-     * @param random        {@link Random} instance to use
-     * @param identities    {@link SshIdentHandler} for handling the SSH 
identificaton string
-     * @param events        {@link SessionListener} to report some events
-     * @param cryptListener {@link EncryptionListener} called just before a 
buffer is encrypted
-     * @param proposer      {@link Proposer} to get KEX proposals
-     * @param checker       {@link HostKeyChecker} to check the peer's host 
key; may be {@code null} if on a server
+     * @param session    {@link AbstractSession} this filter is for
+     * @param random     {@link Random} instance to use
+     * @param identities {@link SshIdentHandler} for handling the SSH 
identificaton string
+     * @param events     {@link SessionListener} to report some events
+     * @param proposer   {@link Proposer} to get KEX proposals
+     * @param checker    {@link HostKeyChecker} to check the peer's host key; 
may be {@code null} if on a server
      */
     public SshTransportFilter(AbstractSession session, Random random, 
SshIdentHandler identities, SessionListener events,
-                              EncryptionListener cryptListener, Proposer 
proposer, HostKeyChecker checker) {
+                              Proposer proposer, HostKeyChecker checker) {
         IdentFilter ident = new IdentFilter();
         ident.setPropertyResolver(session);
         ident.setIdentHandler(identities);
@@ -73,7 +72,6 @@ public class SshTransportFilter extends IoFilter {
         cryptFilter = new CryptFilter();
         cryptFilter.setSession(session);
         cryptFilter.setRandom(random);
-        cryptFilter.addEncryptionListener(cryptListener);
         filters.addLast(cryptFilter);
 
         compressionFilter = new CompressionFilter();
@@ -159,6 +157,14 @@ public class SshTransportFilter extends IoFilter {
         kexFilter.removeKexListener(listener);
     }
 
+    public void addEncryptionListener(EncryptionListener listener) {
+        cryptFilter.addEncryptionListener(listener);
+    }
+
+    public void removeEncryptionListener(EncryptionListener listener) {
+        cryptFilter.removeEncryptionListener(listener);
+    }
+
     public boolean isSecure() {
         return cryptFilter.isSecure();
     }
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/kex/KexFilter.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/kex/KexFilter.java
index 09a19719e..d72fe506b 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/kex/KexFilter.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/kex/KexFilter.java
@@ -1198,16 +1198,16 @@ public class KexFilter extends IoFilter {
 
         private boolean isWantReply(Buffer message, boolean isChannelRequest) {
             boolean wantReply = false;
-            int pos = message.rpos();
+            int mark = message.rpos();
             message.getUByte();
             if (isChannelRequest) {
                 message.getUInt(); // Skip the channel id
             }
             long length = message.getUInt();
             if (length < message.available()) {
-                wantReply = message.rawByte(pos + 5 + (int) length) != 0;
+                wantReply = message.rawByte(message.rpos() + (int) length) != 
0;
             }
-            message.rpos(pos);
+            message.rpos(mark);
             return wantReply;
         }
 
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
index 9251d651d..2580e5e35 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
@@ -19,13 +19,17 @@
 package org.apache.sshd.common.session.helpers;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.SocketTimeoutException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -34,6 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.IntUnaryOperator;
+import java.util.function.LongConsumer;
 
 import org.apache.sshd.agent.common.AgentForwardSupport;
 import org.apache.sshd.agent.common.DefaultAgentForwardSupport;
@@ -42,6 +47,7 @@ import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.channel.Channel;
 import org.apache.sshd.common.channel.ChannelFactory;
 import org.apache.sshd.common.channel.ChannelListener;
@@ -53,7 +59,9 @@ import org.apache.sshd.common.forward.Forwarder;
 import org.apache.sshd.common.forward.ForwarderFactory;
 import org.apache.sshd.common.forward.PortForwardingEventListener;
 import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
+import org.apache.sshd.common.future.GlobalRequestFuture;
 import org.apache.sshd.common.future.HasException;
+import org.apache.sshd.common.global.GlobalRequestException;
 import org.apache.sshd.common.io.AbstractIoWriteFuture;
 import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.kex.KexState;
@@ -61,10 +69,12 @@ import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.session.ReservedSessionMessagesHandler;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.session.UnknownChannelReferenceHandler;
+import org.apache.sshd.common.session.filters.CryptFilter.EncryptionListener;
 import org.apache.sshd.common.util.EventListenerUtils;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.common.util.closeable.AbstractInnerCloseable;
 import org.apache.sshd.common.util.functors.Int2IntFunction;
 import org.apache.sshd.core.CoreModuleProperties;
@@ -78,7 +88,7 @@ import org.apache.sshd.server.x11.X11ForwardSupport;
  */
 public abstract class AbstractConnectionService
         extends AbstractInnerCloseable
-        implements ConnectionService {
+        implements ConnectionService, ReservedSessionMessagesHandler {
 
     /**
      * Default growth factor function used to resize response buffers
@@ -110,6 +120,45 @@ public abstract class AbstractConnectionService
     private final AbstractSession sessionInstance;
     private UnknownChannelReferenceHandler unknownChannelReferenceHandler;
 
+    /**
+     * Used to wait for results of global requests sent with {@code want-reply 
= true}. Note that per RFC 4254, global
+     * requests may be sent at any time, but success/failure replies MUST come 
in the order the requests were sent. Some
+     * implementations may also reply with SSH_MSG_UNIMPLEMENTED, on which RFC 
4253 says they must be sent in the order
+     * the message was received.
+     * <p>
+     * This implies that it is legal to send "nested" global requests: a 
client or server may send two (or more) global
+     * requests, and then receives two (or more) replies in the correct order: 
first reply for the first request sent;
+     * second reply for the second request sent.
+     * </p>
+     * <p>
+     * We keep a FIFO list of pending global requests for which we expect a 
reply. We always add new global requests at
+     * the head. For success and failure replies, which don't identify the 
message sequence number of the global
+     * request, we apply the reply to the tail of the list. For unimplemented 
messages, we apply it to the request
+     * identified by the message sequence number, which normally also should 
be the tail.
+     * </p>
+     * <p>
+     * When a reply is received, the corresponding global request is removed 
from the list.
+     * </p>
+     * <p>
+     * Global requests sent with {@code want-reply = false} are never added to 
this list; they are fire-and-forget.
+     * According to the SSH RFCs, the peer MUST not reply on a message with 
{@code want-reply = false}. If it does so
+     * all the same, it is broken. We might then apply the result to the wrong 
pending global request if we have any.
+     * </p>
+     *
+     * @see <a href="https://tools.ietf.org/html/rfc4254#section-4";>RFC 4254: 
Global Requests</a>
+     * @see <a href="https://tools.ietf.org/html/rfc4253#section-11.4";>RFC 
4254: Reserved Messages</a>
+     * @see #request(Buffer, String, 
org.apache.sshd.common.future.GlobalRequestFuture.ReplyHandler)
+     * @see #requestSuccess(Buffer)
+     * @see #requestFailure(Buffer)
+     * @see #doInvokeUnimplementedMessageHandler(int, Buffer)
+     * @see #preClose()
+     */
+    private final Deque<GlobalRequestFuture> pendingGlobalRequests = new 
ConcurrentLinkedDeque<>();
+
+    private final Map<Buffer, LongConsumer> globalSequenceNumbers = new 
ConcurrentHashMap<>();
+
+    private EncryptionListener sequenceListener;
+
     protected AbstractConnectionService(AbstractSession session) {
         sessionInstance = Objects.requireNonNull(session, "No session");
         listenerProxy = 
EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, listeners);
@@ -180,6 +229,14 @@ public abstract class AbstractConnectionService
     @Override
     public void start() {
         heartBeat = startHeartBeat();
+        sequenceListener = (buffer, sequenceNumber) -> {
+            // SSHD-968 - remember global request outgoing sequence number
+            LongConsumer setter = globalSequenceNumbers.remove(buffer);
+            if (setter != null) {
+                setter.accept(sequenceNumber);
+            }
+        };
+        sessionInstance.getTransport().addEncryptionListener(sequenceListener);
     }
 
     protected synchronized ScheduledFuture<?> startHeartBeat() {
@@ -318,8 +375,201 @@ public abstract class AbstractConnectionService
         return forwarder;
     }
 
+    private boolean wantReply(Buffer buffer) {
+        // Examine the buffer to get the want-reply flag
+        int rpos = buffer.rpos();
+        buffer.getByte(); // Skip command
+        buffer.getString(); // Skip request name
+        boolean replyFlag = buffer.getBoolean();
+        buffer.rpos(rpos); // reset buffer
+        return replyFlag;
+    }
+
+    @Override
+    public Buffer request(String request, Buffer buffer, long maxWaitMillis) 
throws IOException {
+        ValidateUtils.checkTrue(maxWaitMillis > 0,
+                "Requested timeout for " + request + " is not strictly greater 
than zero: " + maxWaitMillis);
+        boolean debugEnabled = log.isDebugEnabled();
+        boolean withReply = wantReply(buffer);
+        GlobalRequestFuture future = request(buffer, request, null);
+        Object result;
+        boolean done = false;
+        try {
+            if (debugEnabled) {
+                log.debug("request({}) request={}, timeout={}ms", this, 
request, maxWaitMillis);
+            }
+            done = future.await(maxWaitMillis);
+            result = future.getValue();
+        } catch (InterruptedIOException e) {
+            throw (InterruptedIOException) new InterruptedIOException(
+                    "Interrupted while waiting for request=" + request + " 
result").initCause(e);
+        }
+
+        if (!isOpen()) {
+            throw new IOException("Session was closed or closing while 
awaiting reply for request=" + request);
+        }
+
+        if (withReply) {
+            if (debugEnabled) {
+                log.debug("request({}) request={}, timeout={}ms, 
requestSeqNo={}, done {}, result received={}", this, request,
+                        maxWaitMillis, future.getSequenceNumber(), done, 
result instanceof Buffer);
+            }
+
+            if (!done || result == null) {
+                throw new SocketTimeoutException("No response received after " 
+ maxWaitMillis + "ms for request=" + request);
+            }
+            // The operation is specified to return null if the request could 
be made, but got an error reply.
+            // The caller cannot distinguish between SSH_MSG_UNIMPLEMENTED and 
SSH_MSG_REQUEST_FAILURE.
+            if (result instanceof GlobalRequestException) {
+                if (debugEnabled) {
+                    log.debug("request({}) request={}, requestSeqNo={}: 
received={}", this, request, future.getSequenceNumber(),
+                            
SshConstants.getCommandMessageName(((GlobalRequestException) 
result).getCode()));
+                }
+                return null;
+            }
+        }
+
+        if (result instanceof Throwable) {
+            throw new IOException("Exception on request " + request, 
(Throwable) result);
+        }
+        if (result instanceof Buffer) {
+            return (Buffer) result;
+        }
+        return null;
+    }
+
+    @Override
+    public GlobalRequestFuture request(Buffer buffer, String request, 
GlobalRequestFuture.ReplyHandler replyHandler)
+            throws IOException {
+        GlobalRequestFuture globalRequest;
+        if (!wantReply(buffer)) {
+            if (!isOpen()) {
+                throw new IOException("Global request " + request + ": session 
is closing or closed.");
+            }
+            // Fire-and-forget global requests (want-reply = false) are always 
allowed; we don't need to register the
+            // future, nor do we have to wait for anything. Client code can 
wait on the returned future if it wants to
+            // be sure the message has been sent.
+            globalRequest = new GlobalRequestFuture(request, replyHandler) {
+
+                @Override
+                public void operationComplete(IoWriteFuture future) {
+                    if (future.isWritten()) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("makeGlobalRequest({})[{}] 
want-reply=false sent", this, getId());
+                        }
+                        setValue(new ByteArrayBuffer(new byte[0]));
+                        GlobalRequestFuture.ReplyHandler handler = 
getHandler();
+                        if (handler != null) {
+                            
handler.accept(SshConstants.SSH_MSG_REQUEST_SUCCESS, getBuffer());
+                        }
+                    }
+                    super.operationComplete(future);
+                }
+            };
+            sessionInstance.writePacket(buffer).addListener(globalRequest);
+            return globalRequest;
+        }
+        // We do expect a reply. The packet may get queued or otherwise 
delayed for an unknown time. We must
+        // consider this request pending only once its sequence number is 
known. If sending the message fails,
+        // the writeFuture will set an exception on the globalRequest, or will 
fail it.
+        globalRequest = new GlobalRequestFuture(request, replyHandler) {
+
+            @Override
+            public void operationComplete(IoWriteFuture future) {
+                if (!future.isWritten()) {
+                    // If it was not written after all, make sure it's not 
considered pending anymore.
+                    pendingGlobalRequests.removeFirstOccurrence(this);
+                }
+                // Super call will fulfill the future if not written
+                super.operationComplete(future);
+                if (future.isWritten() && getHandler() != null) {
+                    // Fulfill this future now. The GlobalRequestFuture can 
thus be used to wait for the
+                    // successful sending of the request, the framework will 
invoke the handler whenever
+                    // the reply arrives. The buffer cannot be obtained though 
the future.
+                    setValue(null);
+                }
+            }
+        };
+        if (!isOpen()) {
+            throw new IOException("Global request " + request + ": session is 
closing or closed.");
+        }
+        // This consumer will be invoked once before the packet actually goes 
out. Some servers respond to global
+        // requests with SSH_MSG_UNIMPLEMENTED instead of 
SSH_MSG_REQUEST_FAILURE (see SSHD-968), so we need to make
+        // sure we do know the sequence number.
+        globalSequenceNumbers.put(buffer, seqNo -> {
+            globalRequest.setSequenceNumber(seqNo);
+            if (log.isDebugEnabled()) {
+                log.debug("makeGlobalRequest({})[{}] want-reply=true with 
seqNo={}", this, globalRequest.getId(), seqNo);
+            }
+            // Insert at front
+            pendingGlobalRequests.push(globalRequest);
+        });
+        sessionInstance.writePacket(buffer).addListener(f -> {
+            Throwable t = f.getException();
+            if (t != null) {
+                // Just in case we get an exception before 
preProcessEncodeBuffer was even called
+                globalSequenceNumbers.remove(buffer);
+            }
+        }).addListener(globalRequest); // Report errors through globalRequest, 
fulfilling globalRequest
+        return globalRequest;
+    }
+
+    @Override
+    public boolean handleUnimplementedMessage(Session session, int cmd, Buffer 
buffer) throws Exception {
+        /*
+         * SSHD-968 Some servers respond to global requests with 
SSH_MSG_UNIMPLEMENTED instead of
+         * SSH_MSG_REQUEST_FAILURE (as mandated by 
https://tools.ietf.org/html/rfc4254#section-4) so deal with it.
+         */
+        if (!pendingGlobalRequests.isEmpty()) {
+            // We do have ongoing global requests.
+            long msgSeqNo = buffer.rawUInt(buffer.rpos());
+
+            // Find the global request this applies to
+            GlobalRequestFuture future = 
pendingGlobalRequests.stream().filter(f -> f.getSequenceNumber() == 
msgSeqNo).findAny()
+                    .orElse(null);
+            if (future != null && 
pendingGlobalRequests.removeFirstOccurrence(future)) {
+                // This SSH_MSG_UNIMPLEMENTED was the reply to a global 
request.
+                if (log.isDebugEnabled()) {
+                    log.debug("doInvokeUnimplementedMessageHandler({}) report 
global request={} failure for seqNo={}", this,
+                            future.getId(), msgSeqNo);
+                }
+                GlobalRequestFuture.ReplyHandler handler = future.getHandler();
+                if (handler != null) {
+                    Buffer resultBuf = 
ByteArrayBuffer.getCompactClone(buffer.array(), buffer.rpos(), 
buffer.available());
+                    handler.accept(cmd, resultBuf);
+                } else {
+                    future.setValue(new GlobalRequestException(cmd));
+                }
+                return true; // message handled internally
+            } else if (future != null) {
+                // The SSH_MSG_UNIMPLEMENTED was for a global request, but 
that request is no longer in the list: it
+                // got terminated otherwise.
+                return true;
+            }
+            if (log.isTraceEnabled()) {
+                log.trace(
+                        "doInvokeUnimplementedMessageHandler({}) 
SSH_MSG_UNIMPLEMENTED with message seqNo={} not for a global request",
+                        this, msgSeqNo);
+            }
+        }
+        return false;
+    }
+
     @Override
     protected void preClose() {
+        // If anyone is waiting for a global response notify them about the 
closing session
+        boolean debugEnabled = log.isDebugEnabled();
+        for (;;) {
+            GlobalRequestFuture future = pendingGlobalRequests.pollLast();
+            if (future == null) {
+                break;
+            }
+            if (debugEnabled) {
+                log.debug("preClose({}): Session closing; failing still 
pending global request {}", this, future.getId());
+            }
+            future.setValue(new SshException("Session is closing"));
+        }
+
         stopHeartBeat();
         this.listeners.clear();
         this.managersHolder.clear();
@@ -390,6 +640,7 @@ public abstract class AbstractConnectionService
         return builder()
                 .sequential(forwarderHolder.get(), agentForwardHolder.get(), 
x11ForwardHolder.get())
                 .parallel(toString(), getChannels())
+                .run(this, () -> 
sessionInstance.getTransport().removeEncryptionListener(sequenceListener))
                 .build();
     }
 
@@ -912,14 +1163,47 @@ public abstract class AbstractConnectionService
         return session.writePacket(rsp);
     }
 
+    /**
+     * Indicates the reception of a {@code SSH_MSG_REQUEST_SUCCESS} message
+     *
+     * @param  buffer    The {@link Buffer} containing the message data
+     * @throws Exception If failed to handle the message
+     */
     protected void requestSuccess(Buffer buffer) throws Exception {
-        AbstractSession s = getSession();
-        s.requestSuccess(buffer);
+        sessionInstance.resetIdleTimeout();
+        // Remove at end
+        GlobalRequestFuture request = pendingGlobalRequests.pollLast();
+        if (request != null) {
+            // use a copy of the original data in case it is re-used on return
+            Buffer resultBuf = ByteArrayBuffer.getCompactClone(buffer.array(), 
buffer.rpos(), buffer.available());
+            GlobalRequestFuture.ReplyHandler handler = request.getHandler();
+            if (handler != null) {
+                handler.accept(SshConstants.SSH_MSG_REQUEST_SUCCESS, 
resultBuf);
+            } else {
+                request.setValue(resultBuf);
+            }
+        }
     }
 
+    /**
+     * Indicates the reception of a {@code SSH_MSG_REQUEST_FAILURE} message
+     *
+     * @param  buffer    The {@link Buffer} containing the message data
+     * @throws Exception If failed to handle the message
+     */
     protected void requestFailure(Buffer buffer) throws Exception {
-        AbstractSession s = getSession();
-        s.requestFailure(buffer);
+        sessionInstance.resetIdleTimeout();
+        // Remove at end
+        GlobalRequestFuture request = pendingGlobalRequests.pollLast();
+        if (request != null) {
+            GlobalRequestFuture.ReplyHandler handler = request.getHandler();
+            if (handler != null) {
+                Buffer resultBuf = 
ByteArrayBuffer.getCompactClone(buffer.array(), buffer.rpos(), 
buffer.available());
+                handler.accept(SshConstants.SSH_MSG_REQUEST_FAILURE, 
resultBuf);
+            } else {
+                request.setValue(new 
GlobalRequestException(SshConstants.SSH_MSG_REQUEST_FAILURE));
+            }
+        }
     }
 
     @Override
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
index 1213458c1..2065b8006 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
@@ -21,24 +21,19 @@ package org.apache.sshd.common.session.helpers;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.ProtocolException;
-import java.net.SocketTimeoutException;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Deque;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.function.LongConsumer;
 
 import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.Factory;
@@ -60,7 +55,6 @@ import 
org.apache.sshd.common.forward.PortForwardingEventListener;
 import org.apache.sshd.common.future.DefaultSshFuture;
 import org.apache.sshd.common.future.GlobalRequestFuture;
 import org.apache.sshd.common.future.KeyExchangeFuture;
-import org.apache.sshd.common.global.GlobalRequestException;
 import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.kex.KexProposalOption;
@@ -70,11 +64,11 @@ import 
org.apache.sshd.common.kex.extension.KexExtensionHandler.AvailabilityPhas
 import org.apache.sshd.common.kex.extension.KexExtensions;
 import org.apache.sshd.common.mac.MacInformation;
 import org.apache.sshd.common.random.Random;
+import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.session.ReservedSessionMessagesHandler;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.session.SessionListener;
 import org.apache.sshd.common.session.filters.CryptFilter;
-import org.apache.sshd.common.session.filters.CryptFilter.EncryptionListener;
 import org.apache.sshd.common.session.filters.SshIdentHandler;
 import org.apache.sshd.common.session.filters.SshTransportFilter;
 import org.apache.sshd.common.session.filters.kex.KexListener;
@@ -83,7 +77,6 @@ import org.apache.sshd.common.util.ExceptionUtils;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
-import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.core.CoreModuleProperties;
 
 /**
@@ -135,43 +128,6 @@ public abstract class AbstractSession extends 
SessionHelper {
     protected String serverVersion;
     protected String clientVersion;
 
-    /**
-     * Used to wait for results of global requests sent with {@code want-reply 
= true}. Note that per RFC 4254, global
-     * requests may be sent at any time, but success/failure replies MUST come 
in the order the requests were sent. Some
-     * implementations may also reply with SSH_MSG_UNIMPLEMENTED, on which RFC 
4253 says they must be sent in the order
-     * the message was received.
-     * <p>
-     * This implies that it is legal to send "nested" global requests: a 
client or server may send two (or more) global
-     * requests, and then receives two (or more) replies in the correct order: 
first reply for the first request sent;
-     * second reply for the second request sent.
-     * </p>
-     * <p>
-     * We keep a FIFO list of pending global requests for which we expect a 
reply. We always add new global requests at
-     * the head. For success and failure replies, which don't identify the 
message sequence number of the global
-     * request, we apply the reply to the tail of the list. For unimplemented 
messages, we apply it to the request
-     * identified by the message sequence number, which normally also should 
be the tail.
-     * </p>
-     * <p>
-     * When a reply is received, the corresponding global request is removed 
from the list.
-     * </p>
-     * <p>
-     * Global requests sent with {@code want-reply = false} are never added to 
this list; they are fire-and-forget.
-     * According to the SSH RFCs, the peer MUST not reply on a message with 
{@code want-reply = false}. If it does so
-     * all the same, it is broken. We might then apply the result to the wrong 
pending global request if we have any.
-     * </p>
-     *
-     * @see <a href="https://tools.ietf.org/html/rfc4254#section-4";>RFC 4254: 
Global Requests</a>
-     * @see <a href="https://tools.ietf.org/html/rfc4253#section-11.4";>RFC 
4254: Reserved Messages</a>
-     * @see #request(Buffer, String, 
org.apache.sshd.common.future.GlobalRequestFuture.ReplyHandler)
-     * @see #requestSuccess(Buffer)
-     * @see #requestFailure(Buffer)
-     * @see #doInvokeUnimplementedMessageHandler(int, Buffer)
-     * @see #preClose()
-     */
-    private final Deque<GlobalRequestFuture> pendingGlobalRequests = new 
ConcurrentLinkedDeque<>();
-
-    private final Map<Buffer, LongConsumer> globalSequenceNumbers = new 
ConcurrentHashMap<>();
-
     private final FilterChain filters = new DefaultFilterChain();
 
     private SshTransportFilter sshTransport;
@@ -327,15 +283,7 @@ public abstract class AbstractSession extends 
SessionHelper {
                 }
             }
         };
-        EncryptionListener sequenceListener = (buffer, sequenceNumber) -> {
-            // SSHD-968 - remember global request outgoing sequence number
-            LongConsumer setter = globalSequenceNumbers.remove(buffer);
-            if (setter != null) {
-                setter.accept(sequenceNumber);
-            }
-        };
-        sshTransport = new SshTransportFilter(this, random, identities, 
sessionEvents, sequenceListener,
-                this::getKexProposal, this::checkKeys);
+        sshTransport = new SshTransportFilter(this, random, identities, 
sessionEvents, this::getKexProposal, this::checkKeys);
         filters.addLast(sshTransport);
     }
 
@@ -584,19 +532,6 @@ public abstract class AbstractSession extends 
SessionHelper {
             sshTransport.shutdown();
         }
 
-        // if anyone waiting for global response notify them about the closing 
session
-        boolean debugEnabled = log.isDebugEnabled();
-        for (;;) {
-            GlobalRequestFuture future = pendingGlobalRequests.pollLast();
-            if (future == null) {
-                break;
-            }
-            if (debugEnabled) {
-                log.debug("preClose({}): Session closing; failing still 
pending global request {}", this, future.getId());
-            }
-            future.setValue(new SshException("Session is closing"));
-        }
-
         // Fire 'close' event
         try {
             signalSessionClosed();
@@ -682,184 +617,27 @@ public abstract class AbstractSession extends 
SessionHelper {
         return writeFuture;
     }
 
-    private boolean wantReply(Buffer buffer) {
-        // Examine the buffer to get the want-reply flag
-        int rpos = buffer.rpos();
-        buffer.getByte(); // Skip command
-        buffer.getString(); // Skip request name
-        boolean replyFlag = buffer.getBoolean();
-        buffer.rpos(rpos); // reset buffer
-        return replyFlag;
-    }
-
     @Override
     public Buffer request(String request, Buffer buffer, long maxWaitMillis) 
throws IOException {
-        ValidateUtils.checkTrue(maxWaitMillis > 0,
-                "Requested timeout for " + request + " is not strictly greater 
than zero: " + maxWaitMillis);
-        boolean debugEnabled = log.isDebugEnabled();
-        boolean withReply = wantReply(buffer);
-        GlobalRequestFuture future = request(buffer, request, null);
-        Object result;
-        boolean done = false;
-        try {
-            if (debugEnabled) {
-                log.debug("request({}) request={}, timeout={}ms", this, 
request, maxWaitMillis);
-            }
-            done = future.await(maxWaitMillis);
-            result = future.getValue();
-        } catch (InterruptedIOException e) {
-            throw (InterruptedIOException) new InterruptedIOException(
-                    "Interrupted while waiting for request=" + request + " 
result").initCause(e);
-        }
-
-        if (!isOpen()) {
-            throw new IOException("Session was closed or closing while 
awaiting reply for request=" + request);
-        }
-
-        if (withReply) {
-            if (debugEnabled) {
-                log.debug("request({}) request={}, timeout={}ms, 
requestSeqNo={}, done {}, result received={}", this, request,
-                        maxWaitMillis, future.getSequenceNumber(), done, 
result instanceof Buffer);
-            }
-
-            if (!done || result == null) {
-                throw new SocketTimeoutException("No response received after " 
+ maxWaitMillis + "ms for request=" + request);
-            }
-            // The operation is specified to return null if the request could 
be made, but got an error reply.
-            // The caller cannot distinguish between SSH_MSG_UNIMPLEMENTED and 
SSH_MSG_REQUEST_FAILURE.
-            if (result instanceof GlobalRequestException) {
-                if (debugEnabled) {
-                    log.debug("request({}) request={}, requestSeqNo={}: 
received={}", this, request, future.getSequenceNumber(),
-                            
SshConstants.getCommandMessageName(((GlobalRequestException) 
result).getCode()));
-                }
-                return null;
-            }
-        }
-
-        if (result instanceof Throwable) {
-            throw new IOException("Exception on request " + request, 
(Throwable) result);
-        }
-        if (result instanceof Buffer) {
-            return (Buffer) result;
-        }
-        return null;
+        ConnectionService service = getCurrentService(ConnectionService.class);
+        ValidateUtils.checkNotNull(service, "Current service is not a 
ConnectionService");
+        return service.request(request, buffer, maxWaitMillis);
     }
 
     @Override
     public GlobalRequestFuture request(Buffer buffer, String request, 
GlobalRequestFuture.ReplyHandler replyHandler)
             throws IOException {
-        GlobalRequestFuture globalRequest;
-        if (!wantReply(buffer)) {
-            if (!isOpen()) {
-                throw new IOException("Global request " + request + ": session 
is closing or closed.");
-            }
-            // Fire-and-forget global requests (want-reply = false) are always 
allowed; we don't need to register the
-            // future, nor do we have to wait for anything. Client code can 
wait on the returned future if it wants to
-            // be sure the message has been sent.
-            globalRequest = new GlobalRequestFuture(request, replyHandler) {
-
-                @Override
-                public void operationComplete(IoWriteFuture future) {
-                    if (future.isWritten()) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("makeGlobalRequest({})[{}] 
want-reply=false sent", this, getId());
-                        }
-                        setValue(new ByteArrayBuffer(new byte[0]));
-                        GlobalRequestFuture.ReplyHandler handler = 
getHandler();
-                        if (handler != null) {
-                            
handler.accept(SshConstants.SSH_MSG_REQUEST_SUCCESS, getBuffer());
-                        }
-                    }
-                    super.operationComplete(future);
-                }
-            };
-            writePacket(buffer).addListener(globalRequest);
-            return globalRequest;
-        }
-        // We do expect a reply. The packet may get queued or otherwise 
delayed for an unknown time. We must
-        // consider this request pending only once its sequence number is 
known. If sending the message fails,
-        // the writeFuture will set an exception on the globalRequest, or will 
fail it.
-        globalRequest = new GlobalRequestFuture(request, replyHandler) {
-
-            @Override
-            public void operationComplete(IoWriteFuture future) {
-                if (!future.isWritten()) {
-                    // If it was not written after all, make sure it's not 
considered pending anymore.
-                    pendingGlobalRequests.removeFirstOccurrence(this);
-                }
-                // Super call will fulfill the future if not written
-                super.operationComplete(future);
-                if (future.isWritten() && getHandler() != null) {
-                    // Fulfill this future now. The GlobalRequestFuture can 
thus be used to wait for the
-                    // successful sending of the request, the framework will 
invoke the handler whenever
-                    // the reply arrives. The buffer cannot be obtained though 
the future.
-                    setValue(null);
-                }
-            }
-        };
-        if (!isOpen()) {
-            throw new IOException("Global request " + request + ": session is 
closing or closed.");
-        }
-        // This consumer will be invoked once before the packet actually goes 
out. Some servers respond to global
-        // requests with SSH_MSG_UNIMPLEMENTED instead of 
SSH_MSG_REQUEST_FAILURE (see SSHD-968), so we need to make
-        // sure we do know the sequence number.
-        globalSequenceNumbers.put(buffer, seqNo -> {
-            globalRequest.setSequenceNumber(seqNo);
-            if (log.isDebugEnabled()) {
-                log.debug("makeGlobalRequest({})[{}] want-reply=true with 
seqNo={}", this, globalRequest.getId(), seqNo);
-            }
-            // Insert at front
-            pendingGlobalRequests.push(globalRequest);
-        });
-        writePacket(buffer).addListener(f -> {
-            Throwable t = f.getException();
-            if (t != null) {
-                // Just in case we get an exception before 
preProcessEncodeBuffer was even called
-                globalSequenceNumbers.remove(buffer);
-            }
-        }).addListener(globalRequest); // Report errors through globalRequest, 
fulfilling globalRequest
-        return globalRequest;
+        ConnectionService service = getCurrentService(ConnectionService.class);
+        ValidateUtils.checkNotNull(service, "Current service is not a 
ConnectionService");
+        return service.request(buffer, request, replyHandler);
     }
 
     @Override
     protected boolean doInvokeUnimplementedMessageHandler(int cmd, Buffer 
buffer) throws Exception {
-        /*
-         * SSHD-968 Some servers respond to global requests with 
SSH_MSG_UNIMPLEMENTED instead of
-         * SSH_MSG_REQUEST_FAILURE (as mandated by 
https://tools.ietf.org/html/rfc4254#section-4) so deal with it.
-         */
-        if (!pendingGlobalRequests.isEmpty() && cmd == 
SshConstants.SSH_MSG_UNIMPLEMENTED) {
-            // We do have ongoing global requests.
-            long msgSeqNo = buffer.rawUInt(buffer.rpos());
-
-            // Find the global request this applies to
-            GlobalRequestFuture future = 
pendingGlobalRequests.stream().filter(f -> f.getSequenceNumber() == 
msgSeqNo).findAny()
-                    .orElse(null);
-            if (future != null && 
pendingGlobalRequests.removeFirstOccurrence(future)) {
-                // This SSH_MSG_UNIMPLEMENTED was the reply to a global 
request.
-                if (log.isDebugEnabled()) {
-                    log.debug("doInvokeUnimplementedMessageHandler({}) report 
global request={} failure for seqNo={}", this,
-                            future.getId(), msgSeqNo);
-                }
-                GlobalRequestFuture.ReplyHandler handler = future.getHandler();
-                if (handler != null) {
-                    Buffer resultBuf = 
ByteArrayBuffer.getCompactClone(buffer.array(), buffer.rpos(), 
buffer.available());
-                    handler.accept(cmd, resultBuf);
-                } else {
-                    future.setValue(new GlobalRequestException(cmd));
-                }
-                return true; // message handled internally
-            } else if (future != null) {
-                // The SSH_MSG_UNIMPLEMENTED was for a global request, but 
that request is no longer in the list: it
-                // got terminated otherwise.
-                return true;
-            }
-            if (log.isTraceEnabled()) {
-                log.trace(
-                        "doInvokeUnimplementedMessageHandler({}) 
SSH_MSG_UNIMPLEMENTED with message seqNo={} not for a global request",
-                        this, msgSeqNo);
-            }
+        ReservedSessionMessagesHandler service = 
getCurrentService(ReservedSessionMessagesHandler.class);
+        if (service != null && service.handleUnimplementedMessage(this, cmd, 
buffer)) {
+            return true;
         }
-
         return super.doInvokeUnimplementedMessageHandler(cmd, buffer);
     }
 
@@ -929,49 +707,6 @@ public abstract class AbstractSession extends 
SessionHelper {
         return sendNotImplemented(sshTransport.getLastInputSequenceNumber());
     }
 
-    /**
-     * Indicates the reception of a {@code SSH_MSG_REQUEST_SUCCESS} message
-     *
-     * @param  buffer    The {@link Buffer} containing the message data
-     * @throws Exception If failed to handle the message
-     */
-    protected void requestSuccess(Buffer buffer) throws Exception {
-        resetIdleTimeout();
-        // Remove at end
-        GlobalRequestFuture request = pendingGlobalRequests.pollLast();
-        if (request != null) {
-            // use a copy of the original data in case it is re-used on return
-            Buffer resultBuf = ByteArrayBuffer.getCompactClone(buffer.array(), 
buffer.rpos(), buffer.available());
-            GlobalRequestFuture.ReplyHandler handler = request.getHandler();
-            if (handler != null) {
-                handler.accept(SshConstants.SSH_MSG_REQUEST_SUCCESS, 
resultBuf);
-            } else {
-                request.setValue(resultBuf);
-            }
-        }
-    }
-
-    /**
-     * Indicates the reception of a {@code SSH_MSG_REQUEST_FAILURE} message
-     *
-     * @param  buffer    The {@link Buffer} containing the message data
-     * @throws Exception If failed to handle the message
-     */
-    protected void requestFailure(Buffer buffer) throws Exception {
-        resetIdleTimeout();
-        // Remove at end
-        GlobalRequestFuture request = pendingGlobalRequests.pollLast();
-        if (request != null) {
-            GlobalRequestFuture.ReplyHandler handler = request.getHandler();
-            if (handler != null) {
-                Buffer resultBuf = 
ByteArrayBuffer.getCompactClone(buffer.array(), buffer.rpos(), 
buffer.available());
-                handler.accept(SshConstants.SSH_MSG_REQUEST_FAILURE, 
resultBuf);
-            } else {
-                request.setValue(new 
GlobalRequestException(SshConstants.SSH_MSG_REQUEST_FAILURE));
-            }
-        }
-    }
-
     @Override
     public void addSessionListener(SessionListener listener) {
         SessionListener.validateListener(listener);
@@ -1135,6 +870,14 @@ public abstract class AbstractSession extends 
SessionHelper {
         }
     }
 
+    protected <T> T getCurrentService(Class<? extends T> type) {
+        Service service = currentService.getService();
+        if (type.isInstance(service)) {
+            return type.cast(service);
+        }
+        return null;
+    }
+
     /**
      * Indicates the the key exchange is completed and the exchanged keys can 
now be verified - e.g., client can verify
      * the server's key

Reply via email to