This is an automated email from the ASF dual-hosted git repository.

twolf pushed a commit to branch dev_3.0
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git

commit 3972387c03509acae59c5a18d48dec5e1b500b18
Author: Thomas Wolf <tw...@apache.org>
AuthorDate: Tue Apr 1 00:07:52 2025 +0200

    Introduce an InjectIgnoreFilter
    
    Add a filter that injects SSH_MSG_IGNORE packets of random length
    according to the properties configured. The filter injects ignore
    messages only when not in KEX, and not for low-level messages.
---
 .../sshd/common/session/filters/CryptFilter.java   |  14 +-
 .../common/session/filters/InjectIgnoreFilter.java | 186 ++++++++++-----------
 .../common/session/helpers/AbstractSession.java    |   3 +
 3 files changed, 105 insertions(+), 98 deletions(-)

diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/CryptFilter.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/CryptFilter.java
index df7d41534..5915a00ef 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/CryptFilter.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/CryptFilter.java
@@ -46,6 +46,15 @@ import org.slf4j.LoggerFactory;
  */
 public class CryptFilter extends IoFilter implements CryptStatisticsProvider {
 
+    /**
+     * The maximum padding length we use. RFC 4253: at least 4 bytes padding, 
at most 255 bytes.
+     * <p>
+     * Keep the padding size &lt;= 127, though: JSch has a bug where it reads 
the pad byte as a signed value when
+     * compression is used!
+     * </p>
+     */
+    public static final int MAX_PADDING = 127;
+
     private static final Logger LOG = 
LoggerFactory.getLogger(CryptFilter.class);
 
     // The minimum value for the packet length field of a valid SSH packet:
@@ -58,11 +67,6 @@ public class CryptFilter extends IoFilter implements 
CryptStatisticsProvider {
     // is 8 bytes.
     private static final int MIN_PACKET_LENGTH = 8;
 
-    // RFC 4253: at least 4 bytes padding, at most 255 bytes.
-    //
-    // Keep the padding size <= 127, though: JSch has a bug where it reads the 
pad byte as a signed int!
-    private static final int MAX_PADDING = 127;
-
     private static final int UNKNOWN_PACKET_LENGTH = -1;
 
     private final AtomicReference<Settings> decryption = new 
AtomicReference<>();
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/InjectIgnoreFilter.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/InjectIgnoreFilter.java
index 34e670c1f..9f49e46e3 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/InjectIgnoreFilter.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/InjectIgnoreFilter.java
@@ -20,16 +20,15 @@ package org.apache.sshd.common.session.filters;
 
 import java.io.IOException;
 import java.util.Objects;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.sshd.common.PropertyResolver;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.filter.InputHandler;
 import org.apache.sshd.common.filter.IoFilter;
 import org.apache.sshd.common.filter.OutputHandler;
-import org.apache.sshd.common.io.DefaultIoWriteFuture;
+import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.random.Random;
+import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.core.CoreModuleProperties;
@@ -37,89 +36,26 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A filter that injects ignore messages depending on the configuration 
settings.
+ * A filter that injects SSH_MSG_IGNORE messages depending on the 
configuration settings.
+ * <p>
+ * This filter should be placed below the KexFilter to ensure that it doesn't 
inject ignore message during KEX.
+ * </p>
  */
 public class InjectIgnoreFilter extends IoFilter {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(InjectIgnoreFilter.class);
 
-    private static final long DISABLED = -1;
-
     private final PropertyResolver resolver;
 
     private final Random random;
 
-    private final AtomicReference<Settings> settings = new AtomicReference<>();
-
-    private final AtomicLong ignoreCount = new AtomicLong(DISABLED);
+    private final OutputHandler output = new Injector();
 
     public InjectIgnoreFilter(PropertyResolver resolver, Random random) {
         this.resolver = Objects.requireNonNull(resolver);
         this.random = Objects.requireNonNull(random);
     }
 
-    private Settings getSettings() {
-        Settings result = settings.get();
-        if (result == null) {
-            int length = 
CoreModuleProperties.IGNORE_MESSAGE_SIZE.getRequired(resolver);
-            long frequency = 
CoreModuleProperties.IGNORE_MESSAGE_FREQUENCY.getRequired(resolver);
-            int variance = 
CoreModuleProperties.IGNORE_MESSAGE_VARIANCE.getRequired(resolver);
-            if (variance >= frequency) {
-                variance = 0;
-            }
-            result = new Settings(length, frequency, variance);
-            settings.set(result);
-            ignoreCount.set(calculateNextIgnorePacketCount(result));
-        }
-        return result;
-    }
-
-    private long calculateNextIgnorePacketCount(Settings s) {
-        if ((s.frequency <= 0) || (s.variance < 0)) {
-            return DISABLED;
-        }
-        if (s.variance == 0) {
-            return s.frequency;
-        }
-        int extra = random.random(Math.abs(s.variance));
-        long count = (s.variance < 0) ? (s.frequency - extra) : (s.frequency + 
extra);
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("calculateNextIgnorePacketCount({}) count={}", resolver, 
count);
-        }
-
-        return count;
-    }
-
-    private int shouldSendIgnore(int cmd) {
-        if (cmd <= SshConstants.SSH_MSG_KEX_LAST) {
-            return 0;
-        }
-        Settings s = getSettings();
-        if (s.length <= 0) {
-            return 0;
-        }
-        long count = ignoreCount.decrementAndGet();
-        if (count < 0) {
-            ignoreCount.set(DISABLED);
-        } else if (count == 0) {
-            ignoreCount.set(calculateNextIgnorePacketCount(s));
-            return s.length;
-        }
-        return 0;
-    }
-
-    private Buffer createIgnoreBuffer(int length) {
-        int size = length + random.random(length + 1);
-        Buffer buffer = new ByteArrayBuffer(SshConstants.SSH_PACKET_HEADER_LEN 
+ 1 + size + 255 + 64);
-        buffer.rpos(SshConstants.SSH_PACKET_HEADER_LEN);
-        buffer.wpos(SshConstants.SSH_PACKET_HEADER_LEN);
-        buffer.putByte(SshConstants.SSH_MSG_IGNORE);
-        int start = buffer.wpos();
-        buffer.wpos(buffer.wpos() + size);
-        random.fill(buffer.array(), start, size);
-        return buffer;
-    }
-
     @Override
     public InputHandler in() {
         return null;
@@ -127,33 +63,93 @@ public class InjectIgnoreFilter extends IoFilter {
 
     @Override
     public OutputHandler out() {
-        // TODO: problem here: If we do this via future chaining, then a 
subsequent call may actually overtake the
-        // message, which may break KEX.
-        // If we do lastWrite, we may get an unbounded chain of futures. Yuck.
-        // If we place this filter above the KEX filter, it would be OK once 
we have the KEX filter with its queue in
-        // place.
-        return message -> {
+        return output;
+    }
+
+    private class Injector implements OutputHandler {
+
+        private Settings settings;
+
+        private long ignoreCount;
+
+        Injector() {
+            super();
+        }
+
+        @Override
+        public synchronized IoWriteFuture send(Buffer message) throws 
IOException {
             int cmd = message.rawByte(message.rpos()) & 0xFF;
             int length = shouldSendIgnore(cmd);
-            if (length == 0) {
-                return owner().send(InjectIgnoreFilter.this, message);
+            if (length > 0) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Injector.send({}) injecting SSH_MSG_IGNORE", 
resolver);
+                }
+                owner().send(InjectIgnoreFilter.this, 
createIgnoreBuffer(length)).addListener(f -> {
+                    Throwable t = f.getException();
+                    if (t != null && (resolver instanceof Session)) {
+                        ((Session) resolver).exceptionCaught(t);
+                    }
+                });
             }
-            DefaultIoWriteFuture result = new DefaultIoWriteFuture(this, null);
-            owner().send(this, 
createIgnoreBuffer(length)).addListener(baseSent -> {
-                Throwable t = baseSent.getException();
-                if (t != null) {
-                    result.setValue(t);
+            return owner().send(InjectIgnoreFilter.this, message);
+        }
+
+        private Settings getSettings() {
+            if (settings == null) {
+                int length = 
CoreModuleProperties.IGNORE_MESSAGE_SIZE.getRequired(resolver);
+                long frequency = 
CoreModuleProperties.IGNORE_MESSAGE_FREQUENCY.getRequired(resolver);
+                int variance = 
CoreModuleProperties.IGNORE_MESSAGE_VARIANCE.getRequired(resolver);
+                if (variance >= frequency) {
+                    variance = 0;
                 }
-                try {
-                    owner().send(InjectIgnoreFilter.this, 
message).addListener(sent -> {
-                        result.setValue(sent.isWritten() ? Boolean.TRUE : 
sent.getException());
-                    });
-                } catch (IOException e) {
-                    result.setValue(e);
+                settings = new Settings(length, frequency, variance);
+                if (!settings.isDisabled()) {
+                    ignoreCount = calculateNextIgnorePacketCount(settings);
                 }
-            });
-            return result;
-        };
+            }
+            return settings;
+        }
+
+        private long calculateNextIgnorePacketCount(Settings s) {
+            if (s.variance == 0) {
+                return s.frequency;
+            }
+            int extra = random.random(Math.abs(s.variance));
+            long count = (s.variance < 0) ? (s.frequency - extra) : 
(s.frequency + extra);
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("calculateNextIgnorePacketCount({}) count={}", 
resolver, count);
+            }
+
+            return count;
+        }
+
+        private int shouldSendIgnore(int cmd) {
+            if (cmd <= SshConstants.SSH_MSG_KEX_LAST) {
+                return 0;
+            }
+            Settings s = getSettings();
+            if (s.isDisabled()) {
+                return 0;
+            }
+            long count = --ignoreCount;
+            if (count == 0) {
+                ignoreCount = calculateNextIgnorePacketCount(s);
+                return s.length;
+            }
+            return 0;
+        }
+
+        private Buffer createIgnoreBuffer(int length) {
+            int size = length + random.random(length + 1);
+            Buffer buffer = new 
ByteArrayBuffer(SshConstants.SSH_PACKET_HEADER_LEN + 1 + size + 
CryptFilter.MAX_PADDING + 64);
+            buffer.rpos(SshConstants.SSH_PACKET_HEADER_LEN);
+            buffer.wpos(SshConstants.SSH_PACKET_HEADER_LEN);
+            buffer.putByte(SshConstants.SSH_MSG_IGNORE);
+            int start = buffer.wpos();
+            buffer.wpos(buffer.wpos() + size);
+            random.fill(buffer.array(), start, size);
+            return buffer;
+        }
     }
 
     private static class Settings {
@@ -166,5 +162,9 @@ public class InjectIgnoreFilter extends IoFilter {
             this.frequency = frequency;
             this.variance = variance;
         }
+
+        boolean isDisabled() {
+            return length <= 0 || frequency <= 0 || variance < 0;
+        }
     }
 }
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
index 2e5c54d20..34f721871 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
@@ -77,6 +77,7 @@ import 
org.apache.sshd.common.session.filters.CompressionFilter;
 import org.apache.sshd.common.session.filters.CryptFilter;
 import org.apache.sshd.common.session.filters.DelayKexInitFilter;
 import org.apache.sshd.common.session.filters.IdentFilter;
+import org.apache.sshd.common.session.filters.InjectIgnoreFilter;
 import org.apache.sshd.common.session.filters.SshIdentHandler;
 import org.apache.sshd.common.session.filters.kex.KexFilter;
 import org.apache.sshd.common.session.filters.kex.KexListener;
@@ -335,6 +336,8 @@ public abstract class AbstractSession extends SessionHelper 
{
         compressionFilter.setSession(this);
         filters.addLast(compressionFilter);
 
+        filters.addLast(new InjectIgnoreFilter(this, random));
+
         DelayKexInitFilter delayKexFilter = new DelayKexInitFilter();
         delayKexFilter.setSession(this);
         filters.addLast(delayKexFilter);

Reply via email to