This is an automated email from the ASF dual-hosted git repository.

twolf pushed a commit to branch dev_3.0
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git

commit 056e3044e4d449767d24e0b7459dcc1ed9e46889
Author: Thomas Wolf <tw...@apache.org>
AuthorDate: Sun Apr 6 15:30:29 2025 +0200

    Drop ThreadUtils.isInternalThread()
    
    This was used in version 2.X to decide whether a thread writing messages
    during KEX could be blocked. In 3.X, we never block threads explicitly
    during KEX, we just close all channel windows, which will implicitly
    block data pumping threads until KEX is over.
---
 .../sshd/common/future/AbstractSshFuture.java      |  6 +-
 .../sshd/common/util/threads/ThreadUtils.java      | 64 ----------------------
 .../sshd/common/forward/DefaultForwarder.java      |  5 +-
 .../org/apache/sshd/common/forward/SocksProxy.java |  4 +-
 .../sshd/server/forward/TcpipServerChannel.java    |  4 +-
 5 files changed, 6 insertions(+), 77 deletions(-)

diff --git 
a/sshd-common/src/main/java/org/apache/sshd/common/future/AbstractSshFuture.java
 
b/sshd-common/src/main/java/org/apache/sshd/common/future/AbstractSshFuture.java
index 7b52dca16..a8e3c16c5 100644
--- 
a/sshd-common/src/main/java/org/apache/sshd/common/future/AbstractSshFuture.java
+++ 
b/sshd-common/src/main/java/org/apache/sshd/common/future/AbstractSshFuture.java
@@ -31,7 +31,6 @@ import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.util.ExceptionUtils;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.logging.AbstractLoggingBean;
-import org.apache.sshd.common.util.threads.ThreadUtils;
 
 /**
  * @param  <T> Type of future
@@ -174,10 +173,7 @@ public abstract class AbstractSshFuture<T extends 
SshFuture<T>> extends Abstract
     protected void notifyListener(SshFutureListener<T> l) {
         try {
             T arg = asT();
-            ThreadUtils.runAsInternal(() -> {
-                l.operationComplete(arg);
-                return null;
-            });
+            l.operationComplete(arg);
         } catch (Throwable t) {
             warn("notifyListener({}) failed ({}) to invoke {}: {}",
                     this, t.getClass().getSimpleName(), l, t.getMessage(), t);
diff --git 
a/sshd-common/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java
 
b/sshd-common/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java
index d5e84d05b..a1cdf4c3d 100644
--- 
a/sshd-common/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java
+++ 
b/sshd-common/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java
@@ -18,10 +18,8 @@
  */
 package org.apache.sshd.common.util.threads;
 
-import java.io.IOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -31,7 +29,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
 import org.apache.sshd.common.util.ReflectionUtils;
-import org.apache.sshd.common.util.io.functors.IOFunction;
 
 /**
  * Utility class for thread pools.
@@ -40,71 +37,10 @@ import org.apache.sshd.common.util.io.functors.IOFunction;
  */
 public final class ThreadUtils {
 
-    /**
-     * Marks framework-internal threads.
-     */
-    private static final ThreadLocal<Boolean> IS_INTERNAL_THREAD = new 
ThreadLocal<>();
-
     private ThreadUtils() {
         throw new UnsupportedOperationException("No instance");
     }
 
-    /**
-     * Runs a piece of code given as a {@link Callable} with a flag set 
indicating that the executing thread is an
-     * Apache MINA sshd framework-internal thread.
-     *
-     * @param  <V>       return type
-     * @param  code      code to run
-     * @return           the result of {@code code}
-     * @throws Exception propagated from {@code code.call()}
-     * @see              #isInternalThread()
-     */
-    public static <V> V runAsInternal(Callable<V> code) throws Exception {
-        if (isInternalThread()) {
-            return code.call();
-        }
-        try {
-            IS_INTERNAL_THREAD.set(Boolean.TRUE);
-            return code.call();
-        } finally {
-            IS_INTERNAL_THREAD.remove();
-        }
-    }
-
-    /**
-     * Runs an {@link IOFunction} with a flag set indicating that the 
executing thread is an Apache MINA sshd
-     * framework-internal thread.
-     *
-     * @param  <T>         parameter type
-     * @param  <V>         return type
-     * @param  param       parameter for the function
-     * @param  code        function to run
-     * @return             the result of {@code code}
-     * @throws IOException propagated from {@code code.apply()}
-     * @see                #isInternalThread()
-     */
-    public static <T, V> V runAsInternal(T param, IOFunction<? super T, V> 
code) throws IOException {
-        if (isInternalThread()) {
-            return code.apply(param);
-        }
-        try {
-            IS_INTERNAL_THREAD.set(Boolean.TRUE);
-            return code.apply(param);
-        } finally {
-            IS_INTERNAL_THREAD.remove();
-        }
-    }
-
-    /**
-     * Tells whether the calling thread is an Apache MINA sshd 
framework-internal thread.
-     *
-     * @return {@code true} if the thread is considered internal to the 
framework; {@code false} if not
-     * @see    #runAsInternal(Callable)
-     */
-    public static boolean isInternalThread() {
-        return Boolean.TRUE.equals(IS_INTERNAL_THREAD.get());
-    }
-
     /**
      * Wraps an {@link CloseableExecutorService} in such a way as to 
&quot;protect&quot; it for calls to the
      * {@link CloseableExecutorService#shutdown()} or {@link 
CloseableExecutorService#shutdownNow()}. All other calls
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwarder.java 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwarder.java
index 75fc53ba3..b27f9f8ab 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwarder.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwarder.java
@@ -65,7 +65,6 @@ import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.common.util.closeable.AbstractInnerCloseable;
 import org.apache.sshd.common.util.io.functors.IOConsumer;
 import org.apache.sshd.common.util.net.SshdSocketAddress;
-import org.apache.sshd.common.util.threads.ThreadUtils;
 import org.apache.sshd.core.CoreModuleProperties;
 import org.apache.sshd.server.forward.TcpForwardingFilter;
 
@@ -875,7 +874,7 @@ public class DefaultForwarder
                         session, channel, totalMessages, message.available());
             }
             session.suspendRead();
-            ThreadUtils.runAsInternal(() -> 
channel.getAsyncIn().writeBuffer(buffer).addListener(f -> {
+            channel.getAsyncIn().writeBuffer(buffer).addListener(f -> {
                 session.resumeRead();
                 Throwable e = f.getException();
                 if (e != null) {
@@ -889,7 +888,7 @@ public class DefaultForwarder
                 } else if (log.isTraceEnabled()) {
                     log.trace("messageReceived({}) channel={} message={} 
forwarded", session, channel, totalMessages);
                 }
-            }));
+            });
         }
 
         @Override
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
index 28a9d6fe2..46880fb34 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
@@ -33,7 +33,6 @@ import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.common.util.closeable.AbstractCloseable;
 import org.apache.sshd.common.util.net.SshdSocketAddress;
-import org.apache.sshd.common.util.threads.ThreadUtils;
 
 /**
  * SOCKS proxy server, supporting simple socks4/5 protocols.
@@ -102,8 +101,7 @@ public class SocksProxy extends AbstractCloseable 
implements IoHandler {
 
         protected void onMessage(Buffer buffer) throws IOException {
             session.suspendRead();
-            ThreadUtils.runAsInternal(channel.getAsyncIn(),
-                    out -> out.writeBuffer(buffer).addListener(f -> 
session.resumeRead()));
+            channel.getAsyncIn().writeBuffer(buffer).addListener(f -> 
session.resumeRead());
         }
 
         @Override
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
 
b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
index 1465a632f..89c5a4ca6 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -370,7 +370,7 @@ public class TcpipServerChannel extends 
AbstractServerChannel implements Forward
                 Buffer buffer = new ByteArrayBuffer(length, false);
                 buffer.putBuffer(message);
                 session.suspendRead();
-                ThreadUtils.runAsInternal(() -> 
out.writeBuffer(buffer).addListener(f -> {
+                out.writeBuffer(buffer).addListener(f -> {
                     session.resumeRead();
                     Throwable e = f.getException();
                     if (e != null) {
@@ -380,7 +380,7 @@ public class TcpipServerChannel extends 
AbstractServerChannel implements Forward
                     } else if (log.isTraceEnabled()) {
                         log.trace("messageReceived({}) channel={} message 
forwarded", session, TcpipServerChannel.this);
                     }
-                }));
+                });
             }
         }
 

Reply via email to