This is an automated email from the ASF dual-hosted git repository. desruisseaux pushed a commit to branch geoapi-4.0 in repository https://gitbox.apache.org/repos/asf/sis.git
commit f8b2f5024bd691ea5d175f9ea148e15d07a267b1 Author: Martin Desruisseaux <martin.desruisse...@geomatys.com> AuthorDate: Thu Dec 22 17:19:10 2022 +0100 Close idle connection after a timeout. --- .../sis/internal/system/DelayedExecutor.java | 2 +- .../internal/storage/io/FileCacheByteChannel.java | 89 +++++++++++++++++++++- 2 files changed, 87 insertions(+), 4 deletions(-) diff --git a/core/sis-utility/src/main/java/org/apache/sis/internal/system/DelayedExecutor.java b/core/sis-utility/src/main/java/org/apache/sis/internal/system/DelayedExecutor.java index a72d7b3da3..db2129ae9e 100644 --- a/core/sis-utility/src/main/java/org/apache/sis/internal/system/DelayedExecutor.java +++ b/core/sis-utility/src/main/java/org/apache/sis/internal/system/DelayedExecutor.java @@ -28,7 +28,7 @@ import static java.util.logging.Logger.getLogger; * A thread executing short tasks after some (potentially zero nanosecond) delay. * This class should be reserved to internal SIS usage without user's code. * In practice some user code may be indirectly executed through SIS tasks invoking overrideable methods. - * But all submitted tasks shall be very quick, since there is only one thread shared by everyone. + * But all submitted tasks shall be very quick, because there is only one thread shared by everyone. * * <p>The methods for use in this class are:</p> * <ul> diff --git a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java index b40ac37e6c..6ca122e7bd 100644 --- a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java +++ b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java @@ -26,12 +26,18 @@ import java.nio.channels.FileChannel; import java.nio.channels.SeekableByteChannel; import java.nio.channels.NonWritableChannelException; import org.apache.sis.internal.storage.Resources; +import org.apache.sis.internal.system.DelayedExecutor; +import org.apache.sis.internal.system.DelayedRunnable; +import org.apache.sis.internal.system.Modules; import org.apache.sis.internal.util.Strings; import org.apache.sis.util.ArgumentChecks; import org.apache.sis.util.ArraysExt; import org.apache.sis.util.CharSequences; import org.apache.sis.util.resources.Errors; import org.apache.sis.util.collection.RangeSet; +import org.apache.sis.util.logging.Logging; + +import static java.util.logging.Logger.getLogger; /** @@ -72,6 +78,11 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { */ protected static final String RANGES_UNIT = "bytes"; + /** + * Number of nanoseconds to wait before to close an inactive connection. + */ + private static final long TIMEOUT = 2 * 1000_000_000L; + /** * Information about an input stream and its range of bytes. * This is the return value of {@link #openConnection(long, long)}. @@ -453,6 +464,7 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { offset = skipInInput(offset); if (offset != 0) { count = readFromCache(dst); + usedConnection(); if (count >= 0) { return count; } @@ -483,10 +495,11 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { buffer.limit(limit); } if (buffer != dst) { - dst.put(buffer.flip()); // Transfer temporary to destination buffer. + dst.put(buffer.flip()); // Transfer from temporary buffer to destination buffer. } position += count; } + usedConnection(); return count; } @@ -545,6 +558,7 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { * @throws IOException if an I/O error occurred. */ private long drainAndAbort() throws IOException { + assert Thread.holdsLock(this); long count = 0; final InputStream input = connection.input; for (int c; (c = input.available()) > 0;) { @@ -608,8 +622,9 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { @Override public synchronized void close() throws IOException { final Connection c = connection; - connection = null; - transfer = null; + connection = null; + transfer = null; + idleHandler = null; try (file) { if (c != null && !abort(c.input)) { c.input.close(); @@ -617,6 +632,74 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { } } + /** + * Notifies that the connection has been used and should not be closed before some timeout. + * This method may schedule a task to be executed in a background thread after the timeout. + * If the connection can not read sub-ranges of bytes, then this method does nothing because + * reopening a new connection would be costly. + */ + private void usedConnection() { + assert Thread.holdsLock(this); + final Connection c = connection; + if (c != null && c.acceptRanges) { + final long lastReadTime = System.nanoTime(); + if (idleHandler != null) { + idleHandler.lastReadTime = lastReadTime; + } else { + idleHandler = new IdleConnectionCloser(lastReadTime); + DelayedExecutor.schedule(idleHandler); + } + } + } + + /** + * The task which has been scheduled for closing inactive connection, or {@code null} if none. + */ + private IdleConnectionCloser idleHandler; + + /** + * A task to execute when the connection is inactive for a time longer than the timeout. + * This is needed because the number of connections that we can create may be small (e.g. 50), + * and keeping an inactive connection in this channel may prevent other channels to work. + * + * @see #TIMEOUT + */ + private final class IdleConnectionCloser extends DelayedRunnable { + /** + * Value of {@link System#nanoTime()} at the last time that {@link #read(ByteBuffer)} has been invoked. + */ + long lastReadTime; + + /** + * Creates a new task to be executed at the given time relative to {@link System#nanoTime()}. + */ + IdleConnectionCloser(final long lastReadTime) { + super(lastReadTime + TIMEOUT); + this.lastReadTime = lastReadTime; + } + + /** + * Invoked in a background thread after a delay for closing a possibly inactive connection. + * If this method confirms that the connection has been inactive for a time longer than the timeout, + * then the connection is closed. Otherwise a new task is scheduled for checking again later. + */ + @Override public void run() { + synchronized (FileCacheByteChannel.this) { + idleHandler = null; + final Connection c = connection; + if (c != null && c.acceptRanges) { + if (System.nanoTime() - lastReadTime < TIMEOUT) { + idleHandler = new IdleConnectionCloser(lastReadTime); + } else try { + drainAndAbort(); + } catch (IOException e) { + Logging.unexpectedException(getLogger(Modules.STORAGE), IdleConnectionCloser.class, "run", e); + } + } + } + } + } + /** * Returns a string representation for debugging purpose. */