This is an automated email from the ASF dual-hosted git repository. twolf pushed a commit to branch dev_3.0 in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
commit 056e3044e4d449767d24e0b7459dcc1ed9e46889 Author: Thomas Wolf <tw...@apache.org> AuthorDate: Sun Apr 6 15:30:29 2025 +0200 Drop ThreadUtils.isInternalThread() This was used in version 2.X to decide whether a thread writing messages during KEX could be blocked. In 3.X, we never block threads explicitly during KEX, we just close all channel windows, which will implicitly block data pumping threads until KEX is over. --- .../sshd/common/future/AbstractSshFuture.java | 6 +- .../sshd/common/util/threads/ThreadUtils.java | 64 ---------------------- .../sshd/common/forward/DefaultForwarder.java | 5 +- .../org/apache/sshd/common/forward/SocksProxy.java | 4 +- .../sshd/server/forward/TcpipServerChannel.java | 4 +- 5 files changed, 6 insertions(+), 77 deletions(-) diff --git a/sshd-common/src/main/java/org/apache/sshd/common/future/AbstractSshFuture.java b/sshd-common/src/main/java/org/apache/sshd/common/future/AbstractSshFuture.java index 7b52dca16..a8e3c16c5 100644 --- a/sshd-common/src/main/java/org/apache/sshd/common/future/AbstractSshFuture.java +++ b/sshd-common/src/main/java/org/apache/sshd/common/future/AbstractSshFuture.java @@ -31,7 +31,6 @@ import org.apache.sshd.common.SshException; import org.apache.sshd.common.util.ExceptionUtils; import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.logging.AbstractLoggingBean; -import org.apache.sshd.common.util.threads.ThreadUtils; /** * @param <T> Type of future @@ -174,10 +173,7 @@ public abstract class AbstractSshFuture<T extends SshFuture<T>> extends Abstract protected void notifyListener(SshFutureListener<T> l) { try { T arg = asT(); - ThreadUtils.runAsInternal(() -> { - l.operationComplete(arg); - return null; - }); + l.operationComplete(arg); } catch (Throwable t) { warn("notifyListener({}) failed ({}) to invoke {}: {}", this, t.getClass().getSimpleName(), l, t.getMessage(), t); diff --git a/sshd-common/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java b/sshd-common/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java index d5e84d05b..a1cdf4c3d 100644 --- a/sshd-common/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java +++ b/sshd-common/src/main/java/org/apache/sshd/common/util/threads/ThreadUtils.java @@ -18,10 +18,8 @@ */ package org.apache.sshd.common.util.threads; -import java.io.IOException; import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -31,7 +29,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.apache.sshd.common.util.ReflectionUtils; -import org.apache.sshd.common.util.io.functors.IOFunction; /** * Utility class for thread pools. @@ -40,71 +37,10 @@ import org.apache.sshd.common.util.io.functors.IOFunction; */ public final class ThreadUtils { - /** - * Marks framework-internal threads. - */ - private static final ThreadLocal<Boolean> IS_INTERNAL_THREAD = new ThreadLocal<>(); - private ThreadUtils() { throw new UnsupportedOperationException("No instance"); } - /** - * Runs a piece of code given as a {@link Callable} with a flag set indicating that the executing thread is an - * Apache MINA sshd framework-internal thread. - * - * @param <V> return type - * @param code code to run - * @return the result of {@code code} - * @throws Exception propagated from {@code code.call()} - * @see #isInternalThread() - */ - public static <V> V runAsInternal(Callable<V> code) throws Exception { - if (isInternalThread()) { - return code.call(); - } - try { - IS_INTERNAL_THREAD.set(Boolean.TRUE); - return code.call(); - } finally { - IS_INTERNAL_THREAD.remove(); - } - } - - /** - * Runs an {@link IOFunction} with a flag set indicating that the executing thread is an Apache MINA sshd - * framework-internal thread. - * - * @param <T> parameter type - * @param <V> return type - * @param param parameter for the function - * @param code function to run - * @return the result of {@code code} - * @throws IOException propagated from {@code code.apply()} - * @see #isInternalThread() - */ - public static <T, V> V runAsInternal(T param, IOFunction<? super T, V> code) throws IOException { - if (isInternalThread()) { - return code.apply(param); - } - try { - IS_INTERNAL_THREAD.set(Boolean.TRUE); - return code.apply(param); - } finally { - IS_INTERNAL_THREAD.remove(); - } - } - - /** - * Tells whether the calling thread is an Apache MINA sshd framework-internal thread. - * - * @return {@code true} if the thread is considered internal to the framework; {@code false} if not - * @see #runAsInternal(Callable) - */ - public static boolean isInternalThread() { - return Boolean.TRUE.equals(IS_INTERNAL_THREAD.get()); - } - /** * Wraps an {@link CloseableExecutorService} in such a way as to "protect" it for calls to the * {@link CloseableExecutorService#shutdown()} or {@link CloseableExecutorService#shutdownNow()}. All other calls diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwarder.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwarder.java index 75fc53ba3..b27f9f8ab 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwarder.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwarder.java @@ -65,7 +65,6 @@ import org.apache.sshd.common.util.buffer.ByteArrayBuffer; import org.apache.sshd.common.util.closeable.AbstractInnerCloseable; import org.apache.sshd.common.util.io.functors.IOConsumer; import org.apache.sshd.common.util.net.SshdSocketAddress; -import org.apache.sshd.common.util.threads.ThreadUtils; import org.apache.sshd.core.CoreModuleProperties; import org.apache.sshd.server.forward.TcpForwardingFilter; @@ -875,7 +874,7 @@ public class DefaultForwarder session, channel, totalMessages, message.available()); } session.suspendRead(); - ThreadUtils.runAsInternal(() -> channel.getAsyncIn().writeBuffer(buffer).addListener(f -> { + channel.getAsyncIn().writeBuffer(buffer).addListener(f -> { session.resumeRead(); Throwable e = f.getException(); if (e != null) { @@ -889,7 +888,7 @@ public class DefaultForwarder } else if (log.isTraceEnabled()) { log.trace("messageReceived({}) channel={} message={} forwarded", session, channel, totalMessages); } - })); + }); } @Override diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java index 28a9d6fe2..46880fb34 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java @@ -33,7 +33,6 @@ import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.ByteArrayBuffer; import org.apache.sshd.common.util.closeable.AbstractCloseable; import org.apache.sshd.common.util.net.SshdSocketAddress; -import org.apache.sshd.common.util.threads.ThreadUtils; /** * SOCKS proxy server, supporting simple socks4/5 protocols. @@ -102,8 +101,7 @@ public class SocksProxy extends AbstractCloseable implements IoHandler { protected void onMessage(Buffer buffer) throws IOException { session.suspendRead(); - ThreadUtils.runAsInternal(channel.getAsyncIn(), - out -> out.writeBuffer(buffer).addListener(f -> session.resumeRead())); + channel.getAsyncIn().writeBuffer(buffer).addListener(f -> session.resumeRead()); } @Override diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java index 1465a632f..89c5a4ca6 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java @@ -370,7 +370,7 @@ public class TcpipServerChannel extends AbstractServerChannel implements Forward Buffer buffer = new ByteArrayBuffer(length, false); buffer.putBuffer(message); session.suspendRead(); - ThreadUtils.runAsInternal(() -> out.writeBuffer(buffer).addListener(f -> { + out.writeBuffer(buffer).addListener(f -> { session.resumeRead(); Throwable e = f.getException(); if (e != null) { @@ -380,7 +380,7 @@ public class TcpipServerChannel extends AbstractServerChannel implements Forward } else if (log.isTraceEnabled()) { log.trace("messageReceived({}) channel={} message forwarded", session, TcpipServerChannel.this); } - })); + }); } }