This is an automated email from the ASF dual-hosted git repository.

twolf pushed a commit to branch dev_3.0
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git

commit 2d241fce783eaec641a76ae06f6eac1fef1862a9
Author: Thomas Wolf <tw...@apache.org>
AuthorDate: Sun Feb 16 10:46:46 2025 +0100

    Introduce a filter chain
    
    Add a simple filter chain to an AbstractSession. Currently it doesn't do
    much, but this lays the foundation to migrate parts of that huge session
    into separate filters.
    
    All tests pass.
    
    Future changes will add filters and later also improve this filter chain
    itself.
---
 .../sshd/client/session/ClientSessionImpl.java     |   1 +
 .../sshd/common/filter/BufferInputHandler.java     |  40 ++++
 .../sshd/common/filter/DefaultFilterChain.java     | 218 +++++++++++++++++++++
 .../java/org/apache/sshd/common/filter/Filter.java |  39 ++++
 .../org/apache/sshd/common/filter/FilterChain.java |  81 ++++++++
 .../apache/sshd/common/filter/InputHandler.java    |  36 ++++
 .../org/apache/sshd/common/filter/IoFilter.java    |  72 +++++++
 .../apache/sshd/common/filter/OutputHandler.java   |  41 ++++
 .../java/org/apache/sshd/common/filter/Owned.java  |  40 ++++
 .../org/apache/sshd/common/session/Session.java    |   8 +
 .../common/session/helpers/AbstractSession.java    |  49 ++++-
 .../session/helpers/AbstractSessionIoHandler.java  |   2 +-
 .../sshd/server/session/ServerSessionImpl.java     |   1 +
 .../session/helpers/AbstractSessionTest.java       |   3 +-
 14 files changed, 625 insertions(+), 6 deletions(-)

diff --git 
a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java 
b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
index cb5b298dc..0c03c40ec 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
@@ -85,6 +85,7 @@ public class ClientSessionImpl extends AbstractClientSession {
 
     @Override
     public void start() throws Exception {
+        super.start();
         /*
          * Must be called regardless of whether the client identification is 
sent or not immediately in order to allow
          * opening any underlying proxy protocol - e.g., SOCKS or HTTP CONNECT 
- otherwise the server's identification
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/filter/BufferInputHandler.java 
b/sshd-core/src/main/java/org/apache/sshd/common/filter/BufferInputHandler.java
new file mode 100644
index 000000000..39a661e2d
--- /dev/null
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/filter/BufferInputHandler.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.filter;
+
+import org.apache.sshd.common.util.Readable;
+import org.apache.sshd.common.util.buffer.Buffer;
+
+/**
+ * A specialized {@link InputHandler} that assumes that message is already an 
Apache MINA sshd buffer.
+ */
+public interface BufferInputHandler extends InputHandler {
+
+    @Override
+    default void received(Readable message) throws Exception {
+        if (message instanceof Buffer) {
+            handleMessage((Buffer) message);
+        } else {
+            throw new IllegalArgumentException();
+        }
+    }
+
+    void handleMessage(Buffer message) throws Exception;
+
+}
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/filter/DefaultFilterChain.java 
b/sshd-core/src/main/java/org/apache/sshd/common/filter/DefaultFilterChain.java
new file mode 100644
index 000000000..9e23b2f31
--- /dev/null
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/filter/DefaultFilterChain.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.filter;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.util.Readable;
+import org.apache.sshd.common.util.buffer.Buffer;
+
+/**
+ * A default implementation of a {@link FilterChain}.
+ */
+public class DefaultFilterChain implements FilterChain {
+
+    private volatile Session session;
+
+    private final CopyOnWriteArrayList<Filter> chain = new 
CopyOnWriteArrayList<>();
+
+    public DefaultFilterChain() {
+        super();
+    }
+
+    // Lifecycle methods
+
+    @Override
+    public void init() {
+        // Nothing
+    }
+
+    @Override
+    public void adding(Session owner) {
+        // Nothing
+    }
+
+    @Override
+    public void added(Session owner) {
+        this.session = session;
+    }
+
+    @Override
+    public void removing() {
+        session = null;
+        Filter[] filters = chain.toArray(new Filter[0]);
+        for (Filter f : filters) {
+            f.removing();
+        }
+        chain.clear();
+        for (Filter f : filters) {
+            f.removed(this);
+        }
+    }
+
+    @Override
+    public void removed(Session owner) {
+        // Nothing
+    }
+
+    @Override
+    public Session owner() {
+        return session;
+    }
+
+    // Filter chain operations
+
+    private Filter notDuplicate(Filter filter) {
+        if (chain.indexOf(Objects.requireNonNull(filter)) >= 0) {
+            throw new IllegalStateException("Duplicate filter " + filter);
+        }
+        return filter;
+    }
+
+    private void addAt(int i, Filter filter) {
+        if (i < 0) {
+            throw new NoSuchElementException();
+        }
+        notDuplicate(filter).adding(this);
+        chain.add(i, filter);
+        filter.added(this);
+    }
+
+    @Override
+    public void addFirst(Filter filter) {
+        addAt(0, filter);
+    }
+
+    @Override
+    public void addLast(Filter filter) {
+        addAt(chain.size(), filter);
+    }
+
+    @Override
+    public void addBefore(Filter toAdd, Filter before) {
+        addAt(chain.indexOf(Objects.requireNonNull(before)), 
notDuplicate(toAdd));
+    }
+
+    @Override
+    public void addAfter(Filter toAdd, Filter after) {
+        addAt(chain.indexOf(Objects.requireNonNull(after)) + 1, 
notDuplicate(toAdd));
+    }
+
+    @Override
+    public void remove(Filter filter) {
+        int i = chain.indexOf(Objects.requireNonNull(filter));
+        if (i < 0) {
+            throw new IllegalArgumentException("Filter not in filter chain " + 
filter);
+        }
+        filter.removing();
+        chain.remove(filter);
+        filter.removed(this);
+    }
+
+    @Override
+    public void replace(Filter oldFilter, Filter newFilter) {
+        if (oldFilter.equals(Objects.requireNonNull(newFilter))) {
+            return;
+        }
+        int i = chain.indexOf(oldFilter);
+        if (i < 0) {
+            throw new IllegalArgumentException("Filter not in filter chain " + 
oldFilter);
+        }
+        oldFilter.removing();
+        chain.remove(i);
+        oldFilter.removed(this);
+        newFilter.adding(this);
+        chain.add(i, newFilter);
+        newFilter.added(this);
+    }
+
+    @Override
+    public Filter getFirst() {
+        return chain.isEmpty() ? null : chain.get(0);
+    }
+
+    @Override
+    public Filter getLast() {
+        int i = chain.size();
+        if (i == 0) {
+            return null;
+        }
+        return chain.get(i - 1);
+    }
+
+    @Override
+    public Filter getNext(Filter from) {
+        int i = chain.indexOf(from);
+        if (i < 0) {
+            throw new IllegalArgumentException("Filter not in filter chain: " 
+ from);
+        }
+        if (i == chain.size() - 1) {
+            return null;
+        }
+        return chain.get(i + 1);
+    }
+
+    @Override
+    public Filter getPrevious(Filter from) {
+        int i = chain.indexOf(from);
+        if (i < 0) {
+            throw new IllegalArgumentException("Filter not in filter chain: " 
+ from);
+        }
+        return i == 0 ? null : chain.get(i - 1);
+    }
+
+    @Override
+    public IoWriteFuture send(Filter current, Buffer message) throws 
IOException {
+        int i = chain.indexOf(current);
+        if (i < 0) {
+            throw new IllegalArgumentException("Filter not in filter chain: " 
+ current);
+        }
+        for (int j = i - 1; j >= 0; j--) {
+            Filter f = chain.get(j);
+            OutputHandler handler = f.out();
+            if (handler != null) {
+                return handler.send(message);
+            }
+        }
+        throw new IllegalStateException("Fell off filter chain in send from " 
+ current);
+    }
+
+    @Override
+    public void passOn(Filter current, Readable message) throws Exception {
+        int i = chain.indexOf(current);
+        if (i < 0) {
+            throw new IllegalArgumentException("Filter not in filter chain: " 
+ current);
+        }
+        for (int j = i + 1; j < chain.size(); j++) {
+            Filter f = chain.get(j);
+            InputHandler handler = f.in();
+            if (handler != null) {
+                handler.received(message);
+                return;
+            }
+        }
+        throw new IllegalStateException("Unhandled message: fell off filter 
chain in receive after " + current);
+    }
+
+}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/filter/Filter.java 
b/sshd-core/src/main/java/org/apache/sshd/common/filter/Filter.java
new file mode 100644
index 000000000..8b6897d15
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/filter/Filter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.filter;
+
+/**
+ * A message filter belonging to a {@link FilterChain}.
+ */
+public interface Filter extends Owned<FilterChain> {
+
+    /**
+     * Retrieves the filter's {@link InputHandler}.
+     *
+     * @return the {@link InputHandler} or code {@code null} if this filter is 
an output-only filter
+     */
+    InputHandler in();
+
+    /**
+     * Retrieves the filter's {@link OutputHandler}.
+     *
+     * @return the {@link OutputHandler} or code {@code null} if this filter 
is an input-only filter
+     */
+    OutputHandler out();
+}
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/filter/FilterChain.java 
b/sshd-core/src/main/java/org/apache/sshd/common/filter/FilterChain.java
new file mode 100644
index 000000000..1f41212b3
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/filter/FilterChain.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.filter;
+
+import java.io.IOException;
+
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.util.Readable;
+import org.apache.sshd.common.util.buffer.Buffer;
+
+/**
+ * A general chain of {@link Filter}s owned by a {@link Session}.
+ */
+public interface FilterChain extends Owned<Session> {
+
+    /**
+     * Adds the given filter at the front of the filter chain.
+     *
+     * @param filter to add
+     */
+    void addFirst(Filter filter);
+
+    /**
+     * Adds the given filter at the end of the filter chain.
+     *
+     * @param filter to add
+     */
+    void addLast(Filter filter);
+
+    void addBefore(Filter toAdd, Filter before);
+
+    void addAfter(Filter toAdd, Filter after);
+
+    void remove(Filter filter);
+
+    void replace(Filter oldFilter, Filter newFilter);
+
+    Filter getFirst();
+
+    Filter getLast();
+
+    Filter getNext(Filter from);
+
+    Filter getPrevious(Filter from);
+
+    /**
+     * Pass on an outgoing message to the next filter before {@code current} 
that has an {@link OutputHandler}.
+     *
+     * @param  current     {@link Filter} that is passing on the message
+     * @param  message     being passed on
+     * @return             an {@link IoWriteFuture} that is fulfilled when the 
message has been sent.
+     * @throws IOException if an error occurs
+     */
+    IoWriteFuture send(Filter current, Buffer message) throws IOException;
+
+    /**
+     * Pass on an incoming message to the next filter after {@code current} 
that has an {@link InputHandler}.
+     *
+     * @param  current   {@link Filter} that is passing on the message
+     * @param  message   being passed on
+     * @throws Exception if an error occurs
+     */
+    void passOn(Filter current, Readable message) throws Exception;
+}
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/filter/InputHandler.java 
b/sshd-core/src/main/java/org/apache/sshd/common/filter/InputHandler.java
new file mode 100644
index 000000000..426220bfd
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/filter/InputHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.filter;
+
+import org.apache.sshd.common.util.Readable;
+
+/**
+ * A general handler for incoming messages.
+ */
+public interface InputHandler {
+
+    /**
+     * Consumes the incoming message.
+     *
+     * @param  message   that was received
+     * @throws Exception if an error occurs in handling the message
+     */
+    void received(Readable message) throws Exception;
+
+}
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/filter/IoFilter.java 
b/sshd-core/src/main/java/org/apache/sshd/common/filter/IoFilter.java
new file mode 100644
index 000000000..97e3cd943
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/filter/IoFilter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.filter;
+
+import java.util.Objects;
+
+/**
+ * A base implementation of a {@link Filter}.
+ */
+public abstract class IoFilter implements Filter {
+
+    private volatile FilterChain chain;
+
+    protected IoFilter() {
+        super();
+    }
+
+    @Override
+    public void init() {
+        // Nothing
+    }
+
+    @Override
+    public void adding(FilterChain chain) {
+        // Nothing
+    }
+
+    @Override
+    public void added(FilterChain chain) {
+        this.chain = Objects.requireNonNull(chain);
+    }
+
+    @Override
+    public void removing() {
+        chain = null;
+    }
+
+    @Override
+    public void removed(FilterChain chain) {
+        // Nothing
+    }
+
+    @Override
+    public FilterChain owner() {
+        return chain;
+    }
+
+    protected FilterChain active() {
+        FilterChain myChain = chain;
+        if (myChain == null || myChain.owner() == null) {
+            throw new IllegalStateException();
+        }
+        return myChain;
+    }
+
+}
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/filter/OutputHandler.java 
b/sshd-core/src/main/java/org/apache/sshd/common/filter/OutputHandler.java
new file mode 100644
index 000000000..05786d47d
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/filter/OutputHandler.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.filter;
+
+import java.io.IOException;
+
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.util.buffer.Buffer;
+
+/**
+ * A general handler for outgoing messages.
+ */
+public interface OutputHandler {
+
+    /**
+     * Sends an outgoing message.
+     *
+     * @param  message   {@link Buffer} containing the message; not to be 
re-used before the returned future is
+     *                   fulfilled
+     * @return           an {@link IoWriteFuture} that will be fulfilled once 
the message has been sent.
+     * @throws Exception if an error occurs in handling the message
+     */
+    IoWriteFuture send(Buffer message) throws IOException;
+
+}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/filter/Owned.java 
b/sshd-core/src/main/java/org/apache/sshd/common/filter/Owned.java
new file mode 100644
index 000000000..5b8d1fb32
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/filter/Owned.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.filter;
+
+/**
+ * A general abstraction for something being strongly owned by something else 
and needing life-cycle management.
+ *
+ * @param <T> the type of the owner
+ */
+public interface Owned<T> {
+
+    void init();
+
+    void adding(T owner);
+
+    void added(T owner);
+
+    void removing();
+
+    void removed(T owner);
+
+    T owner();
+
+}
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java 
b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
index 07eb80fd7..eb1badbb5 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
@@ -32,6 +32,7 @@ import org.apache.sshd.common.Service;
 import org.apache.sshd.common.auth.MutableUserHolder;
 import org.apache.sshd.common.channel.ChannelListenerManager;
 import 
org.apache.sshd.common.channel.throttle.ChannelStreamWriterResolverManager;
+import org.apache.sshd.common.filter.FilterChain;
 import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
 import org.apache.sshd.common.forward.PortForwardingInformationProvider;
 import org.apache.sshd.common.future.GlobalRequestFuture;
@@ -65,6 +66,13 @@ public interface Session
         FactoryManagerHolder,
         PortForwardingInformationProvider {
 
+    /**
+     * Retrieves the {@link FilterChain} of the session.
+     *
+     * @return the {@link FilterChain}; never {@code null}
+     */
+    FilterChain getFilterChain();
+
     /**
      * Create a new buffer for the specified SSH packet and reserve the needed 
space (5 bytes) for the packet header.
      *
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
index 029808107..8ced9f0ea 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
@@ -67,6 +67,11 @@ import org.apache.sshd.common.cipher.CipherInformation;
 import org.apache.sshd.common.compression.Compression;
 import org.apache.sshd.common.compression.CompressionInformation;
 import org.apache.sshd.common.digest.Digest;
+import org.apache.sshd.common.filter.DefaultFilterChain;
+import org.apache.sshd.common.filter.FilterChain;
+import org.apache.sshd.common.filter.InputHandler;
+import org.apache.sshd.common.filter.IoFilter;
+import org.apache.sshd.common.filter.OutputHandler;
 import org.apache.sshd.common.forward.PortForwardingEventListener;
 import org.apache.sshd.common.future.DefaultKeyExchangeFuture;
 import org.apache.sshd.common.future.DefaultSshFuture;
@@ -302,6 +307,8 @@ public abstract class AbstractSession extends SessionHelper 
{
 
     private final Map<Buffer, LongConsumer> globalSequenceNumbers = new 
ConcurrentHashMap<>();
 
+    private final FilterChain filters = new DefaultFilterChain();
+
     private byte[] clientKexData; // the payload of the client's 
SSH_MSG_KEXINIT
     private byte[] serverKexData; // the payload of the server's 
SSH_MSG_KEXINIT
 
@@ -337,6 +344,9 @@ public abstract class AbstractSession extends SessionHelper 
{
         tunnelListenerProxy = EventListenerUtils.proxyWrapper(
                 PortForwardingEventListener.class, tunnelListeners);
 
+        filters.adding(this);
+        filters.added(this);
+
         try {
             signalSessionEstablished(ioSession);
         } catch (RuntimeException e) {
@@ -352,7 +362,40 @@ public abstract class AbstractSession extends 
SessionHelper {
      *
      * @throws Exception on errors
      */
-    protected abstract void start() throws Exception;
+    protected void start() throws Exception {
+        IoFilter ioSessionConnector = new IoFilter() {
+
+            @Override
+            public InputHandler in() {
+                return message -> owner().passOn(this, message);
+            }
+
+            @Override
+            public OutputHandler out() {
+                return message -> getIoSession().writeBuffer(message);
+            }
+        };
+        // Temporary. This is work in progress, and actually the whole stuff 
is still handled by the SSH session.
+        // The idea is to migrate parts step by step into filters on this 
filter chain.
+        IoFilter sessionConnector = new IoFilter() {
+            @Override
+            public InputHandler in() {
+                return AbstractSession.this::messageReceived;
+            }
+
+            @Override
+            public OutputHandler out() {
+                return message -> owner().send(this, message);
+            }
+        };
+        filters.addLast(sessionConnector);
+        filters.addFirst(ioSessionConnector);
+    }
+
+    @Override
+    public FilterChain getFilterChain() {
+        return filters;
+    }
 
     /**
      * Creates a new {@link KeyExchangeMessageHandler} instance managing 
packet sending for this session.
@@ -1171,9 +1214,7 @@ public abstract class AbstractSession extends 
SessionHelper {
         // packets are sent in the correct order
         synchronized (encodeLock) {
             Buffer packet = resolveOutputPacket(buffer);
-            IoSession networkSession = getIoSession();
-            IoWriteFuture future = networkSession.writeBuffer(packet);
-            return future;
+            return filters.getLast().out().send(packet);
         }
     }
 
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSessionIoHandler.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSessionIoHandler.java
index feed5db66..53fae497f 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSessionIoHandler.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSessionIoHandler.java
@@ -73,7 +73,7 @@ public abstract class AbstractSessionIoHandler extends 
AbstractLoggingBean imple
     public void messageReceived(IoSession ioSession, Readable message) throws 
Exception {
         AbstractSession session = AbstractSession.getSession(ioSession);
         try {
-            session.messageReceived(message);
+            session.getFilterChain().getFirst().in().received(message);
         } catch (Error e) {
             if (log.isDebugEnabled()) {
                 log.debug("messageReceived({}) failed {} to handle message: 
{}", ioSession, e.getClass().getSimpleName(),
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java 
b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java
index cbabc3585..e3e7ae2f6 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java
@@ -40,6 +40,7 @@ public class ServerSessionImpl extends AbstractServerSession {
 
     @Override
     public void start() throws Exception {
+        super.start();
         String headerConfig = 
CoreModuleProperties.SERVER_EXTRA_IDENTIFICATION_LINES.getOrNull(this);
         String[] headers = GenericUtils.split(headerConfig, 
CoreModuleProperties.SERVER_EXTRA_IDENT_LINES_SEPARATOR);
         // We intentionally create a modifiable array so as to allow users to
diff --git 
a/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java
 
b/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java
index d4b2df0c5..86fe164dc 100644
--- 
a/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java
+++ 
b/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java
@@ -75,6 +75,7 @@ public class AbstractSessionTest extends BaseTestSupport {
     @BeforeEach
     void setUp() throws Exception {
         session = new MySession();
+        session.start();
     }
 
     @AfterEach
@@ -455,7 +456,7 @@ public class AbstractSessionTest extends BaseTestSupport {
 
         @Override
         public void start() throws Exception {
-            // Nothing to do for this test
+            super.start();
         }
 
         @Override

Reply via email to