This is an automated email from the ASF dual-hosted git repository.

desruisseaux pushed a commit to branch geoapi-4.0
in repository https://gitbox.apache.org/repos/asf/sis.git

commit f8b2f5024bd691ea5d175f9ea148e15d07a267b1
Author: Martin Desruisseaux <martin.desruisse...@geomatys.com>
AuthorDate: Thu Dec 22 17:19:10 2022 +0100

    Close idle connection after a timeout.
---
 .../sis/internal/system/DelayedExecutor.java       |  2 +-
 .../internal/storage/io/FileCacheByteChannel.java  | 89 +++++++++++++++++++++-
 2 files changed, 87 insertions(+), 4 deletions(-)

diff --git 
a/core/sis-utility/src/main/java/org/apache/sis/internal/system/DelayedExecutor.java
 
b/core/sis-utility/src/main/java/org/apache/sis/internal/system/DelayedExecutor.java
index a72d7b3da3..db2129ae9e 100644
--- 
a/core/sis-utility/src/main/java/org/apache/sis/internal/system/DelayedExecutor.java
+++ 
b/core/sis-utility/src/main/java/org/apache/sis/internal/system/DelayedExecutor.java
@@ -28,7 +28,7 @@ import static java.util.logging.Logger.getLogger;
  * A thread executing short tasks after some (potentially zero nanosecond) 
delay.
  * This class should be reserved to internal SIS usage without user's code.
  * In practice some user code may be indirectly executed through SIS tasks 
invoking overrideable methods.
- * But all submitted tasks shall be very quick, since there is only one thread 
shared by everyone.
+ * But all submitted tasks shall be very quick, because there is only one 
thread shared by everyone.
  *
  * <p>The methods for use in this class are:</p>
  * <ul>
diff --git 
a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java
 
b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java
index b40ac37e6c..6ca122e7bd 100644
--- 
a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java
+++ 
b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java
@@ -26,12 +26,18 @@ import java.nio.channels.FileChannel;
 import java.nio.channels.SeekableByteChannel;
 import java.nio.channels.NonWritableChannelException;
 import org.apache.sis.internal.storage.Resources;
+import org.apache.sis.internal.system.DelayedExecutor;
+import org.apache.sis.internal.system.DelayedRunnable;
+import org.apache.sis.internal.system.Modules;
 import org.apache.sis.internal.util.Strings;
 import org.apache.sis.util.ArgumentChecks;
 import org.apache.sis.util.ArraysExt;
 import org.apache.sis.util.CharSequences;
 import org.apache.sis.util.resources.Errors;
 import org.apache.sis.util.collection.RangeSet;
+import org.apache.sis.util.logging.Logging;
+
+import static java.util.logging.Logger.getLogger;
 
 
 /**
@@ -72,6 +78,11 @@ public abstract class FileCacheByteChannel implements 
SeekableByteChannel {
      */
     protected static final String RANGES_UNIT = "bytes";
 
+    /**
+     * Number of nanoseconds to wait before to close an inactive connection.
+     */
+    private static final long TIMEOUT = 2 * 1000_000_000L;
+
     /**
      * Information about an input stream and its range of bytes.
      * This is the return value of {@link #openConnection(long, long)}.
@@ -453,6 +464,7 @@ public abstract class FileCacheByteChannel implements 
SeekableByteChannel {
         offset = skipInInput(offset);
         if (offset != 0) {
             count = readFromCache(dst);
+            usedConnection();
             if (count >= 0) {
                 return count;
             }
@@ -483,10 +495,11 @@ public abstract class FileCacheByteChannel implements 
SeekableByteChannel {
                 buffer.limit(limit);
             }
             if (buffer != dst) {
-                dst.put(buffer.flip());     // Transfer temporary to 
destination buffer.
+                dst.put(buffer.flip());             // Transfer from temporary 
buffer to destination buffer.
             }
             position += count;
         }
+        usedConnection();
         return count;
     }
 
@@ -545,6 +558,7 @@ public abstract class FileCacheByteChannel implements 
SeekableByteChannel {
      * @throws IOException if an I/O error occurred.
      */
     private long drainAndAbort() throws IOException {
+        assert Thread.holdsLock(this);
         long count = 0;
         final InputStream input = connection.input;
         for (int c; (c = input.available()) > 0;) {
@@ -608,8 +622,9 @@ public abstract class FileCacheByteChannel implements 
SeekableByteChannel {
     @Override
     public synchronized void close() throws IOException {
         final Connection c = connection;
-        connection = null;
-        transfer = null;
+        connection  = null;
+        transfer    = null;
+        idleHandler = null;
         try (file) {
             if (c != null && !abort(c.input)) {
                 c.input.close();
@@ -617,6 +632,74 @@ public abstract class FileCacheByteChannel implements 
SeekableByteChannel {
         }
     }
 
+    /**
+     * Notifies that the connection has been used and should not be closed 
before some timeout.
+     * This method may schedule a task to be executed in a background thread 
after the timeout.
+     * If the connection can not read sub-ranges of bytes, then this method 
does nothing because
+     * reopening a new connection would be costly.
+     */
+    private void usedConnection() {
+        assert Thread.holdsLock(this);
+        final Connection c = connection;
+        if (c != null && c.acceptRanges) {
+            final long lastReadTime = System.nanoTime();
+            if (idleHandler != null) {
+                idleHandler.lastReadTime = lastReadTime;
+            } else {
+                idleHandler = new IdleConnectionCloser(lastReadTime);
+                DelayedExecutor.schedule(idleHandler);
+            }
+        }
+    }
+
+    /**
+     * The task which has been scheduled for closing inactive connection, or 
{@code null} if none.
+     */
+    private IdleConnectionCloser idleHandler;
+
+    /**
+     * A task to execute when the connection is inactive for a time longer 
than the timeout.
+     * This is needed because the number of connections that we can create may 
be small (e.g. 50),
+     * and keeping an inactive connection in this channel may prevent other 
channels to work.
+     *
+     * @see #TIMEOUT
+     */
+    private final class IdleConnectionCloser extends DelayedRunnable {
+        /**
+         * Value of {@link System#nanoTime()} at the last time that {@link 
#read(ByteBuffer)} has been invoked.
+         */
+        long lastReadTime;
+
+        /**
+         * Creates a new task to be executed at the given time relative to 
{@link System#nanoTime()}.
+         */
+        IdleConnectionCloser(final long lastReadTime) {
+            super(lastReadTime + TIMEOUT);
+            this.lastReadTime = lastReadTime;
+        }
+
+        /**
+         * Invoked in a background thread after a delay for closing a possibly 
inactive connection.
+         * If this method confirms that the connection has been inactive for a 
time longer than the timeout,
+         * then the connection is closed. Otherwise a new task is scheduled 
for checking again later.
+         */
+        @Override public void run() {
+            synchronized (FileCacheByteChannel.this) {
+                idleHandler = null;
+                final Connection c = connection;
+                if (c != null && c.acceptRanges) {
+                    if (System.nanoTime() - lastReadTime < TIMEOUT) {
+                        idleHandler = new IdleConnectionCloser(lastReadTime);
+                    } else try {
+                        drainAndAbort();
+                    } catch (IOException e) {
+                        
Logging.unexpectedException(getLogger(Modules.STORAGE), 
IdleConnectionCloser.class, "run", e);
+                    }
+                }
+            }
+        }
+    }
+
     /**
      * Returns a string representation for debugging purpose.
      */

Reply via email to