This is an automated email from the ASF dual-hosted git repository.
desruisseaux pushed a commit to branch geoapi-4.0
in repository https://gitbox.apache.org/repos/asf/sis.git
The following commit(s) were added to refs/heads/geoapi-4.0 by this push:
new 35a3827ce2 Move `CachedByteChannel` implementation from S3 module to
`sis-storage` for sharing the code with other protocols requirying cache (e.g.
HTTP). This new implementation tries to download only needed ranges of bytes.
35a3827ce2 is described below
commit 35a3827ce2684841229f6e32fed4f99d62b56dc3
Author: Martin Desruisseaux <[email protected]>
AuthorDate: Thu Dec 15 15:58:13 2022 +0100
Move `CachedByteChannel` implementation from S3 module to `sis-storage`
for sharing the code with other protocols requirying cache (e.g. HTTP).
This new implementation tries to download only needed ranges of bytes.
---
cloud/pom.xml | 4 +-
cloud/sis-cloud-aws/pom.xml | 2 +-
.../apache/sis/cloud/aws/s3/CachedByteChannel.java | 293 ++--------
.../org/apache/sis/cloud/aws/s3/FileService.java | 32 +-
.../java/org/apache/sis/internal/jdk17/JDK17.java | 19 +
.../java/org/apache/sis/internal/jdk17/Record.java | 35 ++
.../org/apache/sis/util/collection/RangeSet.java | 61 +-
.../apache/sis/util/collection/package-info.java | 2 +-
pom.xml | 2 +-
.../sis/internal/storage/io/ChannelData.java | 3 +-
.../sis/internal/storage/io/ChannelDataInput.java | 5 +-
.../internal/storage/io/FileCacheByteChannel.java | 637 +++++++++++++++++++++
.../internal/storage/io/ComputedInputStream.java | 149 +++++
.../storage/io/FileCacheByteChannelTest.java | 199 +++++++
.../apache/sis/test/suite/StorageTestSuite.java | 3 +-
15 files changed, 1160 insertions(+), 286 deletions(-)
diff --git a/cloud/pom.xml b/cloud/pom.xml
index 3a2959c1ce..4062f17c4b 100644
--- a/cloud/pom.xml
+++ b/cloud/pom.xml
@@ -92,8 +92,8 @@
=========================================================== -->
<dependencies>
<dependency>
- <groupId>org.apache.sis.core</groupId>
- <artifactId>sis-utility</artifactId>
+ <groupId>org.apache.sis.storage</groupId>
+ <artifactId>sis-storage</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/cloud/sis-cloud-aws/pom.xml b/cloud/sis-cloud-aws/pom.xml
index 3daa4a0340..0ec1620512 100644
--- a/cloud/sis-cloud-aws/pom.xml
+++ b/cloud/sis-cloud-aws/pom.xml
@@ -90,7 +90,7 @@
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
- <version>2.18.25</version>
+ <version>2.18.40</version>
</dependency>
</dependencies>
diff --git
a/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java
b/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java
index 8ca227467a..135e2ba08f 100644
---
a/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java
+++
b/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java
@@ -16,272 +16,105 @@
*/
package org.apache.sis.cloud.aws.s3;
-import java.io.EOFException;
+import java.util.List;
+import java.util.OptionalLong;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.nio.file.StandardOpenOption;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SeekableByteChannel;
-import org.apache.sis.internal.util.Strings;
-import org.apache.sis.util.ArgumentChecks;
-import org.apache.sis.util.resources.Errors;
-import software.amazon.awssdk.core.ResponseInputStream;
+import java.io.InputStream;
+import org.apache.sis.internal.storage.io.FileCacheByteChannel;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.http.Abortable;
/**
* A seekable byte channel which copies S3 data to a temporary file for
caching purposes.
*
* @author Martin Desruisseaux (Geomatys)
- * @version 1.2
+ * @version 1.4
* @since 1.2
* @module
*/
-final class CachedByteChannel implements SeekableByteChannel {
- /**
- * Size of the transfer buffer, in number of bytes.
- */
- private static final int BUFFER_SIZE = 8192;
-
- /**
- * The input stream from which to read data.
- */
- private final ResponseInputStream<GetObjectResponse> input;
-
- /**
- * The file where data are copied.
- */
- private final FileChannel file;
-
- /**
- * A temporary buffer for transferring data when we
- * cannot write directly in the destination buffer.
- */
- private ByteBuffer transfer;
-
- /**
- * Current position of this channel.
- */
- private long position;
-
- /**
- * Number of bytes in the temporary file.
- *
- * In current implementation this value shall be identical to {@code
file.position()}.
- * However, in a future implementation it will become different if we
allow some parts
- * of the file to be without data (sparse file), with data fetched using
HTTP ranges.
- */
- private long validLength;
-
+final class CachedByteChannel extends FileCacheByteChannel {
/**
- * Creates a new channel.
+ * Path to the S3 file to open.
*/
- CachedByteChannel(final ResponseInputStream<GetObjectResponse> stream)
throws IOException {
- input = stream;
- file = FileChannel.open(Files.createTempFile("S3-", null),
- StandardOpenOption.READ, StandardOpenOption.WRITE,
- StandardOpenOption.TRUNCATE_EXISTING,
StandardOpenOption.DELETE_ON_CLOSE);
- }
+ private final KeyPath path;
/**
- * Attempts to read up to <i>r</i> bytes from the channel,
- * where <i>r</i> is the number of bytes remaining in the buffer.
- * Bytes are written in the given buffer starting at the current position.
- * Upon return, the buffer position is advanced by the number of bytes
read.
+ * Creates a new channel for the S3 file identified by the given path.
+ * The connection will be opened when first needed.
*
- * @return number of bytes read, or -1 if end of file.
- */
- @Override
- public synchronized int read(final ByteBuffer dst) throws IOException {
- /*
- * If the channel position is before the end of cached data (i.e. a
backward seek has been done before),
- * use those data without downloading more data from the input stream.
Maybe available data are enough.
- */
- if (position < validLength) {
- final int limit = dst.limit();
- final int start = dst.position();
- final int end = (int) Math.min(limit, start + (validLength -
position));
- final int count;
- dst.limit(end);
- try {
- count = file.read(dst, position);
- position += count;
- } finally {
- dst.limit(limit);
- }
- return count;
- }
- /*
- * At this point we need to download data from the input stream.
- * Get a buffer that we can use with `InputStream.read(byte[])`.
- * It must be a buffer backed by a Java array.
- */
- final ByteBuffer buffer;
- if (dst.hasArray()) {
- buffer = dst;
- } else {
- if (transfer == null) {
- transfer = ByteBuffer.allocate(BUFFER_SIZE);
- }
- buffer = transfer;
- buffer.clear();
- buffer.limit(dst.remaining());
- }
- /*
- * Transfer bytes from the input stream to the buffer.
- * The bytes are also copied to the temporary file.
- */
- final int limit = buffer.limit();
- final int start = buffer.position();
- final int count = input.read(buffer.array(), buffer.arrayOffset() +
start, limit - start);
- if (count > 0) {
- buffer.limit(start + count);
- try {
- cache(buffer);
- } finally {
- buffer.limit(limit);
- }
- /*
- * If we used a temporary buffer, transfer to the destination
buffer.
- */
- if (buffer != dst) {
- buffer.flip();
- dst.put(buffer);
- }
- position += count;
- }
- return count;
- }
-
- /**
- * Writes fully the given buffer in the cache {@linkplain #file}.
- * The data to write starts at current buffer position and stops at the
buffer limit.
- */
- private void cache(final ByteBuffer buffer) throws IOException {
- do {
- if (file.write(buffer) == 0) {
- // Should never happen, but check anyway as a safety against
never-ending loop.
- throw new IOException();
- }
- validLength = file.position();
- } while (buffer.hasRemaining());
- }
-
- /**
- * Attempts to write up to <i>r</i> bytes to the channel,
- * where <i>r</i> is the number of bytes remaining in the buffer.
- * Bytes are read from the given buffer starting at the current position.
- * Upon return, the buffer position is advanced by the number of bytes
written.
+ * @param path path to the S3 file to open.
+ * @throws IOException if the temporary file can not be created.
*/
- @Override
- public int write(final ByteBuffer src) throws IOException {
- throw new IOException("Not supported yet.");
+ CachedByteChannel(final KeyPath path) throws IOException {
+ super("S3-");
+ this.path = path;
}
/**
- * Returns this channel's position.
+ * Returns the filename to use in error messages.
*/
@Override
- public synchronized long position() {
- return position;
+ protected String filename() {
+ return path.getFileName().toString();
}
/**
- * Sets this channel's position.
+ * Creates an input stream which provides the bytes to read starting at
the specified position.
*
- * @param newPosition number of bytes from the beginning to the desired
position.
- * @return {@code this} for method call chaining.
- * @throws IOException if an I/O error occurs.
+ * @param start position of the first byte to read (inclusive).
+ * @param end position of the last byte to read with the returned
stream (inclusive),
+ * or {@link Long#MAX_VALUE} for end of stream.
+ * @return contains the input stream providing the bytes to read starting
at the given start position.
*/
@Override
- public synchronized SeekableByteChannel position(final long newPosition)
throws IOException {
- ArgumentChecks.ensurePositive("newPosition", newPosition);
- long remaining = newPosition - validLength;
- if (remaining > 0) {
- if (transfer == null) {
- transfer = ByteBuffer.allocate(BUFFER_SIZE);
- }
- final ByteBuffer buffer = transfer;
- do {
- buffer.clear();
- if (remaining < BUFFER_SIZE) {
- buffer.limit((int) remaining);
- }
- final int count = input.read(buffer.array(), 0,
buffer.limit());
- if (count <= 0) {
- final Long size = input.response().contentLength();
- throw new
EOFException(Errors.format(Errors.Keys.ValueOutOfRange_4, "newPosition", 0,
size, newPosition));
+ protected Connection openConnection(long start, long end) throws
IOException {
+ final ResponseInputStream<GetObjectResponse> stream;
+ final String contentRange, acceptRanges;
+ final Long contentLength;
+ try {
+ GetObjectRequest.Builder builder =
GetObjectRequest.builder().bucket(path.bucket).key(path.key);
+ final boolean hasEnd = (end > start) && (end != Long.MAX_VALUE);
+ if (start != 0 || hasEnd) {
+ final StringBuilder range = new
StringBuilder(RANGES_UNIT).append('=').append(start);
+ if (hasEnd) {
+ range.append('-').append(end); // Inclusive.
}
- buffer.limit(count);
- cache(buffer);
- remaining -= count;
- } while (remaining > 0);
+ builder = builder.range(range.toString());
+ }
+ stream = path.fs.client().getObject(builder.build());
+ final GetObjectResponse response = stream.response();
+ contentLength = response.contentLength();
+ contentRange = response.contentRange();
+ acceptRanges = response.acceptRanges();
+ } catch (SdkException e) {
+ throw FileService.failure(path, e);
}
- position = newPosition;
- return this;
- }
-
- /**
- * Returns the size of the S3 file.
- *
- * @return number of bytes in the file.
- * @throws IOException if the information is not available.
- */
- @Override
- public long size() throws IOException {
- final Long size = input.response().contentLength();
- if (size != null) return size;
- throw new IOException();
- }
-
- /**
- * Truncates the file to the given size.
- *
- * @param size the new size in bytes.
- * @throws IOException if the operation is not supported.
- */
- @Override
- public SeekableByteChannel truncate(long size) throws IOException {
- throw new IOException("Not supported yet.");
- }
-
- /**
- * Tells whether this channel is open.
- *
- * @return {@code true} if this channel is open.
- */
- @Override
- public boolean isOpen() { // No synchronization, rely on
`FileChannel` thread safety instead.
- return file.isOpen();
+ final List<String> arl = (acceptRanges != null) ?
List.of(acceptRanges) : List.of();
+ if (contentRange == null) {
+ final long length = (contentLength != null) ? contentLength : -1;
+ return new Connection(stream, 0, (length < 0) ? Long.MAX_VALUE :
length, length, Connection.acceptRanges(arl));
+ }
+ return new Connection(stream, contentRange, arl,
+ (contentLength != null) ? OptionalLong.of(contentLength) :
OptionalLong.empty());
}
/**
- * Closes this channel and releases resources.
+ * Invoked when this channel is no longer interested in reading bytes from
the specified stream.
*
- * @throws IOException if an error occurred while closing the channel.
+ * @param input the input stream to eventually close.
+ * @return whether the given input stream has been closed by this method.
*/
@Override
- public synchronized void close() throws IOException {
- transfer = null;
- try {
- file.close();
- } catch (Throwable e) {
- try {
- input.close();
- } catch (Throwable s) {
- e.addSuppressed(s);
- }
- throw e;
+ protected boolean abort(final InputStream input) throws IOException {
+ if (input instanceof Abortable) {
+ ((Abortable) input).abort();
+ return true;
+ } else {
+ return super.abort(input);
}
- input.close();
- }
-
- /**
- * Returns a string representation for debugging purpose.
- */
- @Override
- public String toString() {
- return Strings.toString(getClass(), "position", position,
"validLength", validLength);
}
}
diff --git
a/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/FileService.java
b/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/FileService.java
index 9b44d65de7..15cbb192a6 100644
---
a/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/FileService.java
+++
b/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/FileService.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.net.URI;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.FileStore;
import java.nio.file.FileSystem;
@@ -455,20 +456,17 @@ public class FileService extends FileSystemProvider {
* @throws UnsupportedOperationException if an unsupported option is
specified.
* @throws IOException if an I/O error occurs.
*/
- public ResponseInputStream<GetObjectResponse> newInputStream(final Path
path, final OpenOption... options) throws IOException {
+ @Override
+ public InputStream newInputStream(final Path path, final OpenOption...
options) throws IOException {
+ ensureSupported(options);
final KeyPath kp = toAbsolute(path, true);
- for (final OpenOption opt: options) {
- if (opt == StandardOpenOption.APPEND || opt ==
StandardOpenOption.WRITE) {
- throw new
UnsupportedOperationException(Errors.format(Errors.Keys.UnsupportedArgumentValue_1,
opt));
- }
- }
+ final ResponseInputStream<GetObjectResponse> stream;
try {
- final ResponseInputStream<GetObjectResponse> stream =
kp.fs.client().getObject(
-
GetObjectRequest.builder().bucket(kp.bucket).key(kp.key).build());
- return stream;
+ stream =
kp.fs.client().getObject(GetObjectRequest.builder().bucket(kp.bucket).key(kp.key).build());
} catch (SdkException e) {
throw failure(path, e);
}
+ return stream;
}
/**
@@ -485,9 +483,19 @@ public class FileService extends FileSystemProvider {
public SeekableByteChannel newByteChannel(final Path path, final Set<?
extends OpenOption> options,
final FileAttribute<?>... attributes) throws IOException
{
- final ResponseInputStream<GetObjectResponse> stream =
- newInputStream(path, options.toArray(new
OpenOption[options.size()]));
- return new CachedByteChannel(stream);
+ ensureSupported(options.toArray(OpenOption[]::new));
+ return new CachedByteChannel(toAbsolute(path, true));
+ }
+
+ /**
+ * Ensures that the given array of options does not contain an unsupported
option.
+ */
+ private static void ensureSupported(final OpenOption[] options) {
+ for (final OpenOption opt : options) {
+ if (opt == StandardOpenOption.APPEND || opt ==
StandardOpenOption.WRITE) {
+ throw new
UnsupportedOperationException(Errors.format(Errors.Keys.UnsupportedArgumentValue_1,
opt));
+ }
+ }
}
/**
diff --git
a/core/sis-utility/src/main/java/org/apache/sis/internal/jdk17/JDK17.java
b/core/sis-utility/src/main/java/org/apache/sis/internal/jdk17/JDK17.java
index 9578b2ac21..6154d3ac4f 100644
--- a/core/sis-utility/src/main/java/org/apache/sis/internal/jdk17/JDK17.java
+++ b/core/sis-utility/src/main/java/org/apache/sis/internal/jdk17/JDK17.java
@@ -16,6 +16,9 @@
*/
package org.apache.sis.internal.jdk17;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.Stream;
@@ -63,6 +66,22 @@ public final class JDK17 {
}
}
+ /**
+ * Place holder for {@link InputStream#skipNBytes(long)} method added in
JDK12.
+ */
+ public static void skipNBytes(final InputStream s, long n) throws
IOException {
+ while (n > 0) {
+ long c = s.skip(n);
+ if (c <= 0) {
+ if (c < 0 || s.read() < 0) {
+ throw new EOFException();
+ }
+ c = 1;
+ }
+ n -= c;
+ }
+ }
+
/**
* Place holder for {@link Stream#toList()} method added in JDK16.
*/
diff --git
a/core/sis-utility/src/main/java/org/apache/sis/internal/jdk17/Record.java
b/core/sis-utility/src/main/java/org/apache/sis/internal/jdk17/Record.java
new file mode 100644
index 0000000000..83bbe0e806
--- /dev/null
+++ b/core/sis-utility/src/main/java/org/apache/sis/internal/jdk17/Record.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sis.internal.jdk17;
+
+
+/**
+ * Placeholder for the {@code java.lang.Record} class introduced in Java 16.
+ * This is used for making transition easier when SIS will be ready to upgrade
to Java 17.
+ *
+ * @author Martin Desruisseaux (Geomatys)
+ * @since 1.4
+ * @version 1.4
+ * @module
+ */
+public abstract class Record {
+ /**
+ * Creates a new record.
+ */
+ protected Record() {
+ }
+}
diff --git
a/core/sis-utility/src/main/java/org/apache/sis/util/collection/RangeSet.java
b/core/sis-utility/src/main/java/org/apache/sis/util/collection/RangeSet.java
index 5ce8e281ce..b1e07b8de9 100644
---
a/core/sis-utility/src/main/java/org/apache/sis/util/collection/RangeSet.java
+++
b/core/sis-utility/src/main/java/org/apache/sis/util/collection/RangeSet.java
@@ -23,6 +23,7 @@ import java.io.ObjectOutputStream;
import java.io.ObjectStreamException;
import java.lang.reflect.Array;
import java.util.Arrays;
+import java.util.Objects;
import java.util.Iterator;
import java.util.Comparator;
import java.util.SortedSet;
@@ -101,7 +102,7 @@ import static org.apache.sis.util.Numbers.*;
*
* @author Martin Desruisseaux (Geomatys)
* @author Rémi Maréchal (Geomatys)
- * @version 0.5
+ * @version 1.4
*
* @param <E> the type of range elements.
*
@@ -435,6 +436,9 @@ public class RangeSet<E extends Comparable<? super E>>
extends AbstractSet<Range
* @param upper index after the last value to examine.
*/
final int binarySearch(final E value, final int lower, final int upper) {
+ if (array == null || value == null) {
+ return -1;
+ }
switch (elementCode) {
case DOUBLE: return Arrays.binarySearch((double[]) array,
lower, upper, (Double) value);
case FLOAT: return Arrays.binarySearch((float[]) array,
lower, upper, (Float) value);
@@ -526,7 +530,7 @@ public class RangeSet<E extends Comparable<? super E>>
extends AbstractSet<Range
* is not present neither in this set (i1 < 0) and if its
insertion point is the
* same, then insert the new range in the space between two
existing ranges.
*/
- if (i0 == ~i1) { // Includes the (i0 == length) case.
+ if (i0 == ~i1) { // Includes the (i0 == length)
case.
ensureOrdered(minValue, maxValue);
insertAt(i0, minValue, maxValue);
return true;
@@ -551,9 +555,9 @@ public class RangeSet<E extends Comparable<? super E>>
extends AbstractSet<Range
* minValue: │ (insertion point i0 == 3)
* moving i0: ├────┘
*/
- i0 &= ~1; // Equivalent to executing i0-- only when i0 is odd.
+ i0 &= ~1; // Equivalent to executing i0-- only when
i0 is odd.
if (i1 < 0) {
- i1 = ~i1; // Really tild operator, not minus sign.
+ i1 = ~i1; // Really tild operator, not minus sign.
if ((i1 & 1) == 0) {
/*
* If the "insertion point" is outside any existing range,
expand the previous
@@ -646,7 +650,7 @@ public class RangeSet<E extends Comparable<? super E>>
extends AbstractSet<Range
public boolean remove(final E minValue, final E maxValue) throws
IllegalArgumentException {
ArgumentChecks.ensureNonNull("minValue", minValue);
ArgumentChecks.ensureNonNull("maxValue", maxValue);
- if (length == 0) return false; // Nothing to do if no data.
+ if (length == 0) return false; // Nothing to do
if no data.
ensureOrdered(minValue, maxValue);
// Search insertion index.
@@ -1045,7 +1049,7 @@ public class RangeSet<E extends Comparable<? super E>>
extends AbstractSet<Range
if (lower < 0) {
lower = ~lower;
}
- lower &= ~1; // Force the index to even value.
+ lower &= ~1; // Force the index to even value.
}
if (maxValue != null) {
upper = binarySearch(maxValue, lower, upper);
@@ -1053,8 +1057,8 @@ public class RangeSet<E extends Comparable<? super E>>
extends AbstractSet<Range
upper = ~upper;
}
/*
- * If 'upper' is even (i.e. is the index of a minimal
value), keep that index
- * unchanged because this value is exclusive. But if
'upper' is odd (i.e. is
+ * If `upper` is even (i.e. is the index of a minimal
value), keep that index
+ * unchanged because this value is exclusive. But if
`upper` is odd (i.e. is
* the index of a maximal value), move to the minimal
value of the next range.
*/
upper = (upper + 1) & ~1;
@@ -1398,7 +1402,7 @@ public class RangeSet<E extends Comparable<? super E>>
extends AbstractSet<Range
* Found an insertion point. Make sure that the insertion
* point is inside a range (i.e. before the maximum value).
*/
- index = ~index; // Tild sign, not minus.
+ index = ~index; // Tild sign, not minus.
if ((index & 1) == 0) {
return -1;
}
@@ -1406,8 +1410,7 @@ public class RangeSet<E extends Comparable<? super E>>
extends AbstractSet<Range
// The value is equal to an excluded endpoint.
return -1;
}
- index /= 2; // Round toward 0 (odd index are maximum
values).
- return index;
+ return index >>> 1; // Round toward 0 (odd index are maximum
values).
}
/**
@@ -1417,15 +1420,12 @@ public class RangeSet<E extends Comparable<? super E>>
extends AbstractSet<Range
* Widening conversions are performed as needed.
*
* @param index the range index, from 0 inclusive to {@link #size()
size} exclusive.
- * @return the minimum value for the range at the specified index,
inclusive.
+ * @return the minimum value for the range at the specified index.
* @throws IndexOutOfBoundsException if {@code index} is out of bounds.
* @throws ClassCastException if range elements are not convertible to
{@code long}.
*/
- public long getMinLong(int index) throws IndexOutOfBoundsException,
ClassCastException {
- if ((index *= 2) >= length) {
- throw new IndexOutOfBoundsException();
- }
- return Array.getLong(array, index);
+ public long getMinLong(final int index) throws IndexOutOfBoundsException,
ClassCastException {
+ return Array.getLong(array, Objects.checkIndex(index, length >>> 1) <<
1);
}
/**
@@ -1435,17 +1435,14 @@ public class RangeSet<E extends Comparable<? super E>>
extends AbstractSet<Range
* Widening conversions are performed as needed.
*
* @param index the range index, from 0 inclusive to {@link #size()
size} exclusive.
- * @return the minimum value for the range at the specified index,
inclusive.
+ * @return the minimum value for the range at the specified index.
* @throws IndexOutOfBoundsException if {@code index} is out of bounds.
* @throws ClassCastException if range elements are not convertible to
numbers.
*
* @see org.apache.sis.measure.NumberRange#getMinDouble()
*/
- public double getMinDouble(int index) throws IndexOutOfBoundsException,
ClassCastException {
- if ((index *= 2) >= length) {
- throw new IndexOutOfBoundsException();
- }
- return Array.getDouble(array, index);
+ public double getMinDouble(final int index) throws
IndexOutOfBoundsException, ClassCastException {
+ return Array.getDouble(array, Objects.checkIndex(index, length >>> 1)
<< 1);
}
/**
@@ -1455,15 +1452,12 @@ public class RangeSet<E extends Comparable<? super E>>
extends AbstractSet<Range
* Widening conversions are performed as needed.
*
* @param index the range index, from 0 inclusive to {@link #size()
size} exclusive.
- * @return the maximum value for the range at the specified index,
inclusive.
+ * @return the maximum value for the range at the specified index.
* @throws IndexOutOfBoundsException if {@code index} is out of bounds.
* @throws ClassCastException if range elements are not convertible to
{@code long}.
*/
- public long getMaxLong(int index) throws IndexOutOfBoundsException,
ClassCastException {
- if ((index *= 2) >= length) {
- throw new IndexOutOfBoundsException();
- }
- return Array.getLong(array, index + 1);
+ public long getMaxLong(final int index) throws IndexOutOfBoundsException,
ClassCastException {
+ return Array.getLong(array, (Objects.checkIndex(index, length >>> 1)
<< 1) | 1);
}
/**
@@ -1473,17 +1467,14 @@ public class RangeSet<E extends Comparable<? super E>>
extends AbstractSet<Range
* Widening conversions are performed as needed.
*
* @param index the range index, from 0 inclusive to {@link #size size}
exclusive.
- * @return the maximum value for the range at the specified index,
exclusive.
+ * @return the maximum value for the range at the specified index.
* @throws IndexOutOfBoundsException if {@code index} is out of bounds.
* @throws ClassCastException if range elements are not convertible to
numbers.
*
* @see org.apache.sis.measure.NumberRange#getMaxDouble()
*/
- public double getMaxDouble(int index) throws IndexOutOfBoundsException,
ClassCastException {
- if ((index *= 2) >= length) {
- throw new IndexOutOfBoundsException();
- }
- return Array.getDouble(array, index + 1);
+ public double getMaxDouble(final int index) throws
IndexOutOfBoundsException, ClassCastException {
+ return Array.getDouble(array, (Objects.checkIndex(index, length >>> 1)
<< 1) | 1);
}
/**
diff --git
a/core/sis-utility/src/main/java/org/apache/sis/util/collection/package-info.java
b/core/sis-utility/src/main/java/org/apache/sis/util/collection/package-info.java
index 1dcb2fc8d2..7b25905c19 100644
---
a/core/sis-utility/src/main/java/org/apache/sis/util/collection/package-info.java
+++
b/core/sis-utility/src/main/java/org/apache/sis/util/collection/package-info.java
@@ -51,7 +51,7 @@
* </ul>
*
* @author Martin Desruisseaux (IRD, Geomatys)
- * @version 1.3
+ * @version 1.4
* @since 0.3
* @module
*/
diff --git a/pom.xml b/pom.xml
index 3e85c4c59f..1d1a67e75c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,7 +39,7 @@
<parent>
<groupId>org.apache</groupId>
<artifactId>apache</artifactId>
- <version>28</version>
+ <version>29</version>
</parent>
diff --git
a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelData.java
b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelData.java
index cc4150ea86..73cd926b3e 100644
---
a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelData.java
+++
b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelData.java
@@ -327,7 +327,8 @@ public abstract class ChannelData implements Markable {
}
/**
- * Moves to the given position in the stream, relative to the stream
position at construction time.
+ * Moves to the given position in the stream. The given position is
relative to
+ * the position that the stream had at {@code ChannelData} construction
time.
*
* @param position the position where to move.
* @throws IOException if the stream cannot be moved to the given position.
diff --git
a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelDataInput.java
b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelDataInput.java
index 99e54c8be8..45802f35ff 100644
---
a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelDataInput.java
+++
b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelDataInput.java
@@ -60,7 +60,7 @@ import static
org.apache.sis.util.ArgumentChecks.ensureBetween;
* {@link javax.imageio} is needed.
*
* @author Martin Desruisseaux (Geomatys)
- * @version 1.3
+ * @version 1.4
* @since 0.3
* @module
*/
@@ -899,7 +899,8 @@ public class ChannelDataInput extends ChannelData {
}
/**
- * Moves to the given position in the stream, relative to the stream
position at construction time.
+ * Moves to the given position in the stream. The given position is
relative to
+ * the position that the stream had at {@code ChannelDataInput}
construction time.
*
* @param position the position where to move.
* @throws IOException if the stream cannot be moved to the given position.
diff --git
a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java
b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java
new file mode 100644
index 0000000000..d118eba197
--- /dev/null
+++
b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java
@@ -0,0 +1,637 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sis.internal.storage.io;
+
+import java.util.OptionalLong;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.NonWritableChannelException;
+import org.apache.sis.internal.storage.Resources;
+import org.apache.sis.internal.util.Strings;
+import org.apache.sis.util.ArgumentChecks;
+import org.apache.sis.util.ArraysExt;
+import org.apache.sis.util.CharSequences;
+import org.apache.sis.util.resources.Errors;
+import org.apache.sis.util.collection.RangeSet;
+
+
+/**
+ * A seekable byte channel which copies data from an input stream to a
temporary file.
+ * This class can be used for wrapping HTTP or S3 connections for use with
{@link ChannelDataInput}.
+ * Characteristics:
+ *
+ * <ul>
+ * <li>Bytes read from the input stream are cached in a temporary file for
making backward seeks possible.</li>
+ * <li>The number of bytes of interest {@linkplain #position(long, long) can
be specified}.
+ * It makes possible to specify the range of bytes to download with HTTP
connections.</li>
+ * <li>This implementation is thread-safe.</li>
+ * <li>Current implementation is read-only.</li>
+ * </ul>
+ *
+ * @author Martin Desruisseaux (Geomatys)
+ * @version 1.4
+ * @since 1.4
+ * @module
+ */
+public abstract class FileCacheByteChannel implements SeekableByteChannel {
+ /**
+ * Size of the transfer buffer, in number of bytes.
+ */
+ private static final int BUFFER_SIZE = 8 * 1024;
+
+ /**
+ * Threshold for implementing a change of position by closing current
connection and opening a new one.
+ * If the number of bytes to skip is smaller than this threshold, then we
will rather continue reading
+ * with the current input stream.
+ *
+ * <p>For an average download speed of 25 Mb/s, downloading 64 kB requires
about 1/50 of second.</p>
+ */
+ static final int SKIP_THRESHOLD = 64 * 1024;
+
+ /**
+ * The unit of ranges used in HTTP connections.
+ */
+ protected static final String RANGES_UNIT = "bytes";
+
+ /**
+ * Information about an input stream and its range of bytes.
+ * This is the return value of {@link #openConnection(long, long)}.
+ */
+ protected static final class Connection extends
org.apache.sis.internal.jdk17.Record {
+ /** The input stream for reading the bytes. */
+ final InputStream input;
+
+ /** Position of the first byte read by the input stream (inclusive). */
+ final long start;
+
+ /** Position of the last byte read by the input stream (inclusive). */
+ final long end;
+
+ /** Total length of the stream, or -1 is unknown. */
+ final long length;
+
+ /** Whether connection can be created for ranges of bytes. */
+ final boolean acceptRanges;
+
+ /**
+ * Creates information about a connection.
+ *
+ * @param input the input stream for reading the bytes.
+ * @param start position of the first byte read by the input
stream (inclusive).
+ * @param end position of the last byte read by the input
stream (inclusive).
+ * @param length total length of the stream, or -1 is unknown.
+ * @param acceptRanges whether connection can be created for ranges
of bytes.
+ *
+ * @see #openConnection(long, long)
+ */
+ public Connection(final InputStream input, final long start, final
long end, final long length, final boolean acceptRanges) {
+ this.input = input;
+ this.start = start;
+ this.end = end;
+ this.length = length;
+ this.acceptRanges = acceptRanges;
+ }
+
+ /**
+ * Creates information about a connection by parsing HTTP header.
+ * Example: "Content-Range: bytes 25000-75000/100000".
+ *
+ * @param input the input stream for reading the bytes.
+ * @param contentRange value of "Content-Range" in HTTP header.
+ * @param acceptRanges value of "Accept-Ranges" in HTTP header.
+ * @param contentLength total length of the stream.
+ * @throws IllegalArgumentException if the start, end of length cannot
be parsed.
+ */
+ public Connection(final InputStream input, String contentRange, final
Iterable<String> acceptRanges,
+ final OptionalLong contentLength)
+ {
+ this.input = input;
+ contentRange = contentRange.trim();
+ int s = contentRange.indexOf(' ');
+ if (s >= 0 && (s != RANGES_UNIT.length() ||
!contentRange.regionMatches(true, 0, RANGES_UNIT, 0, s))) {
+ throw new
IllegalArgumentException(Errors.format(Errors.Keys.UnsupportedArgumentValue_1,
contentRange));
+ }
+ int rs = contentRange.indexOf('-', ++s); //
Index of range separator.
+ int ls = contentRange.indexOf('/', Math.max(s, rs+1)); //
Index of length separator.
+ if (contentLength.isPresent()) {
+ length = contentLength.getAsLong();
+ } else if (ls >= 0) {
+ String t = contentRange.substring(ls+1).trim();
+ length = t.equals("*") ? -1 : Long.parseLong(t);
+ } else {
+ length = -1;
+ }
+ if (ls < 0) ls = contentRange.length();
+ if (rs < 0) rs = ls;
+ start = Long.parseLong(contentRange.substring(s, rs).trim());
+ end = (rs < ls) ? Long.parseLong(contentRange.substring(rs+1,
ls).trim()) : length;
+ this.acceptRanges = acceptRanges(acceptRanges);
+ }
+
+ /**
+ * Returns {@code true} if the given "Accept-Ranges" values contains
at least one "bytes" string.
+ *
+ * @param values HTTP header value for "Accept-Ranges".
+ * @return whether the values contains at least one "bytes" string.
+ */
+ public static boolean acceptRanges(final Iterable<String> values) {
+ for (final String t : values) {
+ if (ArraysExt.containsIgnoreCase((String[])
CharSequences.split(t, ','), RANGES_UNIT)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Returns a string representation for debugging purposes.
+ */
+ @Override
+ public String toString() {
+ return Strings.toString(getClass(), "start", start, "end", end);
+ }
+ }
+
+ /**
+ * The source of bytes to read, or {@code null} if the connection has not
yet been established.
+ * The stream can be closed and replaced by another stream if various
connections are opened
+ * for downloading various ranges of bytes. When a new stream is created,
the position of the
+ * {@linkplain #file} channel shall be synchronized with the input stream
position (taking in
+ * account the start of the range).
+ *
+ * @see #openConnection()
+ * @see #openConnection(long, long)
+ * @see #abort(InputStream)
+ */
+ private Connection connection;
+
+ /**
+ * Input/output channel on the temporary or cached file where data are
copied.
+ * The {@linkplain FileChannel#position() position of this file channel}
shall
+ * be the current position of the {@linkplain Connection#input input
stream}.
+ *
+ * <h4>Space consumption</h4>
+ * This channel should be opened on a sparse file.
+ * To check if a file is sparse, the outputs of following shell commands
can be compared:
+ *
+ * {@preformat shell
+ * ls -l the-temporary-file
+ * du --block-size=1 the-temporary-file
+ * }
+ *
+ * @see <a href="https://en.wikipedia.org/wiki/Sparse_file">Sparse file on
Wikipedia</a>
+ */
+ private final FileChannel file;
+
+ /**
+ * A temporary buffer for transferring data when we cannot write directly
in the destination buffer.
+ * It shall be a buffer backed by a Java array, not a direct buffer.
Created when first needed.
+ */
+ private ByteBuffer transfer;
+
+ /**
+ * Current position of this channel. The first byte of this channel is
always at position zero.
+ *
+ * @see #position()
+ */
+ private long position;
+
+ /**
+ * Position after the last requested byte, or ≤ {@linkplain #position} if
unknown.
+ * It can be used for specifying the range of bytes to download from an
HTTP connection.
+ *
+ * @see #position(long, long)
+ */
+ private long endOfInterest;
+
+ /**
+ * Ranges of bytes in the {@linkplain #file} where data are valid.
+ */
+ private final RangeSet<Long> rangesOfAvailableBytes;
+
+ /**
+ * Number of bytes in the full stream, or 0 if not yet computed.
+ */
+ private long length;
+
+ /**
+ * Creates a new channel which will cache bytes in a temporary file.
+ * The source of bytes will be provided by {@link #openConnection(long,
long)}.
+ *
+ * @param prefix prefix of the temporary file to create.
+ * @throws IOException if the temporary file can not be created.
+ */
+ protected FileCacheByteChannel(final String prefix) throws IOException {
+ rangesOfAvailableBytes = RangeSet.create(Long.class, true, false);
+ file = FileChannel.open(Files.createTempFile(prefix, null),
+ StandardOpenOption.READ,
+ StandardOpenOption.WRITE,
+ StandardOpenOption.SPARSE,
+ StandardOpenOption.TRUNCATE_EXISTING,
+ StandardOpenOption.DELETE_ON_CLOSE);
+ }
+
+ /**
+ * Returns the filename to use in error messages.
+ *
+ * @return a filename for error messages.
+ */
+ protected abstract String filename();
+
+ /**
+ * Creates an input stream which provides the bytes to read starting at
the specified position.
+ * If the caller needs only a sub-range of bytes, then the end of the
desired range is specified.
+ * That end is only a hint and can be ignored.
+ *
+ * @param start position of the first byte to read (inclusive).
+ * @param end position of the last byte to read with the returned
stream (inclusive),
+ * or {@link Long#MAX_VALUE} for end of stream.
+ * @return information about the input stream providing the bytes to read
starting at the given start position.
+ * @throws IOException if the connection can not be established.
+ */
+ protected abstract Connection openConnection(long start, long end) throws
IOException;
+
+ /**
+ * Invoked when this channel is no longer interested in reading bytes from
the specified stream.
+ * This method is invoked for example when this channel needs to skip an
arbitrarily large number
+ * of bytes because the {@linkplain #position(long) position changed}. The
{@code input} argument
+ * is the value in the record returned by a previous call to {@link
#openConnection(long, long)}.
+ * The boolean return value tells what this method has done:
+ *
+ * <ul class="verbose">
+ * <li>If this method returns {@code true}, then the given stream has
been closed by this method and this
+ * channel is ready to create a new stream on the next call to
{@link #openConnection(long, long)}.</li>
+ * <li>If this method returns {@code false}, then the given stream is
still alive and should continue to be used.
+ * The {@link #openConnection(long, long)} method will <em>not</em>
be invoked.
+ * Instead, bytes will be skipped by reading them from the current
input stream and caching them.</li>
+ * </ul>
+ *
+ * @param input the input stream to eventually close.
+ * @return whether the given input stream has been closed by this method.
If {@code false},
+ * then this channel should continue to use that input stream
instead of opening a new connection.
+ * @throws IOException if an error occurred while closing the stream or
preparing for next read operations.
+ */
+ protected boolean abort(InputStream input) throws IOException {
+ return false;
+ }
+
+ /**
+ * Returns the number of bytes in the input stream.
+ *
+ * @return number of bytes in the input stream.
+ * @throws IOException if the information is not available.
+ */
+ @Override
+ public synchronized long size() throws IOException {
+ return length;
+ }
+
+ /**
+ * Returns this channel's position.
+ * The first byte read by this channel is always at position zero.
+ *
+ * @return the current channel position.
+ */
+ @Override
+ public synchronized long position() {
+ return position;
+ }
+
+ /**
+ * Sets this channel's position.
+ *
+ * @param newPosition number of bytes from the beginning to the desired
position.
+ * @return {@code this} for method call chaining.
+ * @throws IOException if an I/O error occurs.
+ */
+ @Override
+ public synchronized SeekableByteChannel position(final long newPosition)
throws IOException {
+ ArgumentChecks.ensurePositive("newPosition", newPosition);
+ position = newPosition;
+ if (endOfInterest - newPosition < SKIP_THRESHOLD) {
+ endOfInterest = 0; // Read until end of stream.
+ }
+ return this;
+ }
+
+ /**
+ * Sets this channel's position together with the number of bytes to read.
+ * The number of bytes is only a hint and may be ignored, depending on
subclasses.
+ * Reading more bytes than specified is okay, only potentially less
efficient.
+ *
+ * @param newPosition number of bytes from the beginning to the desired
position.
+ * @param count expected number of bytes to read.
+ * @throws IOException if an I/O error occurs.
+ */
+ final synchronized void position(final long newPosition, final long count)
throws IOException {
+ ArgumentChecks.ensurePositive("newPosition", newPosition);
+ ArgumentChecks.ensureStrictlyPositive("count", count);
+ position = newPosition;
+ endOfInterest = newPosition + count; // Overflow is okay here (will
read until end of stream).
+ }
+
+ /**
+ * Opens a connection on the range of bytes determined by the current
channel position.
+ * The {@link #endOfInterest} position is considered unspecified if not
greater than
+ * {@link #position} (it may be 0).
+ *
+ * @return the opened connection (never {@code null}).
+ * @throws IOException if the connection can not be established.
+ */
+ private Connection openConnection() throws IOException {
+ long end = endOfInterest;
+ if (end > position) end--; // Make inclusive.
+ else end = Long.MAX_VALUE;
+ var c = openConnection(position, end);
+ file.position(c.start);
+ if (c.length >= 0) {
+ length = c.length;
+ }
+ connection = c; // Set only on success.
+ return c;
+ }
+
+ /**
+ * Returns a buffer for transferring bytes from the input stream to the
cache.
+ * This buffer must be backed by a Java array (not a direct buffer).
+ */
+ private ByteBuffer transfer() {
+ if (transfer == null) {
+ transfer = ByteBuffer.allocate(BUFFER_SIZE);
+ }
+ return transfer;
+ }
+
+ /**
+ * Tries to move the input stream by skipping the specified amount of
bytes.
+ * This method is invoked when the source of input streams (the server)
does not support ranges,
+ * or when the number of bytes to skip is too small for being worth to
create a new connection.
+ * This method may skip less bytes than requested. The skipped bytes are
saved in the cache.
+ *
+ * @param count number of bytes to skip.
+ * @return remaining number of bytes to skip after this method execution.
+ * @throws IOException if an I/O error occurred.
+ */
+ private long skipInInput(long count) throws IOException {
+ if (count < 0) {
+ throw new
IOException(Resources.format(Resources.Keys.StreamIsReadOnce_1, filename()));
+ } else if (count != 0) {
+ final InputStream input = connection.input;
+ final ByteBuffer buffer = transfer();
+ do {
+ buffer.clear();
+ if (count < BUFFER_SIZE) {
+ buffer.limit((int) count);
+ }
+ int n = input.read(buffer.array(), 0, buffer.limit());
+ if (n <= 0) {
+ if (n != 0 || (n = input.read()) < 0) { // Block until
we get one byte.
+ break; // End of
stream, but maybe it was a sub-range.
+ }
+ buffer.put((byte) n);
+ n = 1;
+ }
+ cache(buffer.limit(n));
+ count -= n;
+ } while (count > 0);
+ }
+ return count;
+ }
+
+ /**
+ * Attempts to read up to <i>r</i> bytes from the channel,
+ * where <i>r</i> is the number of bytes remaining in the buffer.
+ * Bytes are written in the given buffer starting at the current position.
+ * Upon return, the buffer position is advanced by the number of bytes
read.
+ *
+ * @param dst the buffer where to store the bytes that are read.
+ * @return number of bytes read, or -1 if end of file.
+ * @throws IOException if an error occurred when reading the bytes.
+ */
+ @Override
+ public synchronized int read(final ByteBuffer dst) throws IOException {
+ /*
+ * If the channel position is inside a range of cached data (i.e. a
backward seek has been done before),
+ * use those data without downloading more data from the input stream.
Maybe available data are enough.
+ */
+ int count = readFromCache(dst);
+ if (count >= 0) {
+ return count;
+ }
+ /*
+ * If we reach this point, the byte at `position` is not in the cache.
+ * If a connection exists, we need to either discard it or skip bytes.
+ */
+ Connection c = connection;
+ long offset = position - file.position();
+ if (offset != 0 && c != null) {
+ if ((offset < 0 || (c.acceptRanges && (offset >= SKIP_THRESHOLD ||
position > c.end)))) {
+ offset -= drainAndAbort();
+ c = connection;
+ }
+ }
+ /*
+ * At this point we need to download data from the input stream.
+ * If previous connection can not be used, open a new one for the
range of bytes to read.
+ * Then skip all bytes between the current position and the requested
position.
+ * Those bytes will be saved in the cache.
+ */
+ if (c == null) {
+ c = openConnection();
+ offset = position - c.start;
+ }
+ offset = skipInInput(offset);
+ if (offset != 0) {
+ count = readFromCache(dst);
+ if (count >= 0) {
+ return count;
+ }
+ throw new
EOFException(Errors.format(Errors.Keys.ValueOutOfRange_4, "position", 0,
length, position));
+ }
+ /*
+ * Get a buffer that we can use with `InputStream.read(byte[])`.
+ * It must be a buffer backed by a Java array.
+ */
+ final ByteBuffer buffer;
+ if (dst.hasArray()) {
+ buffer = dst;
+ } else {
+ buffer = transfer();
+ buffer.clear().limit(dst.remaining());
+ }
+ /*
+ * Transfer bytes from the input stream to the buffer.
+ * The bytes are also copied to the temporary file.
+ */
+ final int limit = buffer.limit();
+ final int start = buffer.position();
+ count = c.input.read(buffer.array(),
Math.addExact(buffer.arrayOffset(), start), buffer.remaining());
+ if (count > 0) {
+ try {
+ cache(buffer.limit(start + count));
+ } finally {
+ buffer.limit(limit);
+ }
+ if (buffer != dst) {
+ dst.put(buffer.flip()); // Transfer temporary to
destination buffer.
+ }
+ position += count;
+ }
+ return count;
+ }
+
+ /**
+ * Attempts to read up to bytes from the cache.
+ *
+ * @param dst the buffer where to store the bytes that are read.
+ * @return number of bytes read, or -1 if the cache does not contain the
requested range of bytes.
+ * @throws IOException if an error occurred when reading the bytes.
+ */
+ private int readFromCache(final ByteBuffer dst) throws IOException {
+ final int indexOfRange = rangesOfAvailableBytes.indexOfRange(position);
+ if (indexOfRange < 0) {
+ return -1;
+ }
+ final long endOfCache =
rangesOfAvailableBytes.getMaxLong(indexOfRange);
+ final int limit = dst.limit();
+ final int start = dst.position();
+ final int end = (int) Math.min(limit, start + (endOfCache -
position));
+ final int count;
+ try {
+ count = file.read(dst.limit(end), position);
+ position += count;
+ } finally {
+ dst.limit(limit);
+ }
+ return count;
+ }
+
+ /**
+ * Writes fully the given buffer in the cache {@linkplain #file}.
+ * The data to write starts at current buffer position and stops at the
buffer limit.
+ * This method changes the {@link Connection#input} and {@link #file}
positions by the same amount.
+ *
+ * @param buffer buffer containing data to cache.
+ */
+ private void cache(final ByteBuffer buffer) throws IOException {
+ do {
+ final long start = file.position();
+ final int count = file.write(buffer);
+ if (count <= 0) {
+ // Should never happen, but check anyway as a safety against
never-ending loop.
+ throw new IOException();
+ }
+ long end = start + count;
+ if (end < start) end = Long.MAX_VALUE; // Safety against
overflow.
+ rangesOfAvailableBytes.add(start, end);
+ } while (buffer.hasRemaining());
+ }
+
+ /**
+ * Reads and caches the bytes that are already available in the input
stream, then aborts download.
+ * This method may set {@link #connection} to null.
+ *
+ * @return number of bytes that have been read.
+ * @throws IOException if an I/O error occurred.
+ */
+ private long drainAndAbort() throws IOException {
+ long count = 0;
+ final InputStream input = connection.input;
+ for (int c; (c = input.available()) > 0;) {
+ final ByteBuffer buffer = transfer();
+ buffer.clear();
+ if (c < BUFFER_SIZE) buffer.limit(c);
+ final int n = input.read(buffer.array(), 0, buffer.limit());
+ if (n < 0) break;
+ cache(buffer.limit(n));
+ count += n;
+ }
+ if (abort(input)) {
+ connection = null;
+ }
+ return count;
+ }
+
+ /**
+ * Attempts to write up to <i>r</i> bytes to the channel,
+ * where <i>r</i> is the number of bytes remaining in the buffer.
+ * Bytes are read from the given buffer starting at the current position.
+ * Upon return, the buffer position is advanced by the number of bytes
written.
+ *
+ * <p>The default implementation throws {@link IOException}.</p>
+ *
+ * @param src the buffer containing the bytes to write.
+ * @return number of bytes actually written.
+ * @throws IOException if an error occurred while writing the bytes.
+ */
+ @Override
+ public int write(final ByteBuffer src) throws IOException {
+ throw new NonWritableChannelException();
+ }
+
+ /**
+ * Truncates the file to the given size.
+ *
+ * @param size the new size in bytes.
+ * @throws IOException if the operation is not supported.
+ */
+ @Override
+ public SeekableByteChannel truncate(long size) throws IOException {
+ throw new NonWritableChannelException();
+ }
+
+ /**
+ * Tells whether this channel is open.
+ *
+ * @return {@code true} if this channel is open.
+ */
+ @Override
+ public boolean isOpen() { // No synchronization, rely on `FileChannel`
thread safety instead.
+ return file.isOpen();
+ }
+
+ /**
+ * Closes this channel and releases resources.
+ *
+ * @throws IOException if an error occurred while closing the channel.
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ final Connection c = connection;
+ connection = null;
+ transfer = null;
+ try (file) {
+ if (c != null && !abort(c.input)) {
+ c.input.close();
+ }
+ }
+ }
+
+ /**
+ * Returns a string representation for debugging purpose.
+ */
+ @Override
+ public String toString() {
+ return Strings.toString(getClass(), "filename", filename(),
"position", position, "rangeCount", rangesOfAvailableBytes.size());
+ }
+}
diff --git
a/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/ComputedInputStream.java
b/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/ComputedInputStream.java
new file mode 100644
index 0000000000..953b357a6f
--- /dev/null
+++
b/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/ComputedInputStream.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sis.internal.storage.io;
+
+import java.util.Random;
+import java.io.InputStream;
+
+import static org.junit.Assert.*;
+
+
+/**
+ * An input stream where each value is computed from the stream position.
+ * The byte values are the 1, 2, 3, …, 100, -1, -2, -3, … -100 series repeated
indefinitely.
+ *
+ * @author Martin Desruisseaux (Geomatys)
+ * @version 1.4
+ * @since 1.4
+ * @module
+ */
+final strictfp class ComputedInputStream extends InputStream {
+ /**
+ * Number of bytes in this stream.
+ */
+ private final int length;
+
+ /**
+ * The current stream position.
+ */
+ private int position;
+
+ /**
+ * Random value to be returned by {@link #available()}.
+ */
+ private int available;
+
+ /**
+ * Whether this input stream has been closed.
+ */
+ private boolean closed;
+
+ /**
+ * Generator of random numbers for controlling the behavior of this stream.
+ */
+ private final Random random;
+
+ /**
+ * Creates a new input stream of the given length.
+ *
+ * @param start position of the first byte to read.
+ * @param end position after the last byte to read.
+ * @param rg generator of random numbers for controlling the behavior
of this stream.
+ */
+ ComputedInputStream(final int start, final int end, final Random rg) {
+ assertTrue(start >= 0);
+ assertTrue(start <= end);
+ position = start;
+ length = end;
+ random = rg;
+ }
+
+ /**
+ * Returns the value at the given position.
+ *
+ * @param position the stream position where to get a value.
+ * @return value at the specified stream position.
+ */
+ static byte valueAt(final int position) {
+ int i = (position % 200) + 1;
+ if (i > 100) i = 100 - i;
+ return (byte) i;
+ }
+
+ /**
+ * Reads the next byte of data from the input stream.
+ */
+ @Override
+ public int read() {
+ assertFalse("closed", closed);
+ if (available != 0) available--;
+ return (position < length) ? Byte.toUnsignedInt(valueAt(position++)) :
-1;
+ }
+
+ /**
+ * Reads up to {@code length} bytes of data from the input stream into an
array of bytes.
+ * This method randomly read a smaller number of bytes.
+ *
+ * @param bytes the buffer into which the data is read.
+ * @param offseet the start offset at which the data is written.
+ * @param count the maximum number of bytes to read.
+ * @return the total number of bytes read into the buffer, or {@code -1}
on EOF.
+ */
+ @Override
+ public int read(final byte[] bytes, int offset, int count) {
+ assertFalse("closed", closed);
+ assertNotNull(bytes);
+ assertTrue("Negative count", count >= 0);
+ assertTrue("Nagative offset", offset >= 0);
+ assertTrue("Out of bounds", offset + count <= bytes.length);
+ if (position >= length) {
+ return -1;
+ }
+ if (count != 0) {
+ final int end = Math.min(offset + random.nextInt(count) + 1,
length);
+ count = end - offset;
+ while (offset < end) {
+ bytes[offset++] = valueAt(position++);
+ }
+ }
+ if (available >= count) {
+ available -= count;
+ } else {
+ available = random.nextInt(100);
+ }
+ return count;
+ }
+
+ /**
+ * Returns an estimate of the number of bytes that can be read without
blocking.
+ *
+ * @return an estimate of the number of bytes available.
+ */
+ @Override
+ public int available() {
+ assertFalse("closed", closed);
+ return available;
+ }
+
+ /**
+ * Marks this input stream as closed.
+ */
+ @Override
+ public void close() {
+ closed = true;
+ }
+}
diff --git
a/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java
b/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java
new file mode 100644
index 0000000000..e33951c6b2
--- /dev/null
+++
b/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sis.internal.storage.io;
+
+import java.util.Random;
+import java.util.OptionalLong;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.sis.test.TestCase;
+import org.apache.sis.test.TestUtilities;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+/**
+ * Tests {@link FileCacheByteChannel}.
+ *
+ * @author Martin Desruisseaux (Geomatys)
+ * @version 1.4
+ * @since 1.4
+ * @module
+ */
+public final strictfp class FileCacheByteChannelTest extends TestCase {
+ /**
+ * The implementation used for testing purpose.
+ */
+ private static final class Implementation extends FileCacheByteChannel {
+ /**
+ * Name of the test method. Used for error messages only.
+ */
+ private final String name;
+
+ /**
+ * Number of bytes in the input stream to use for testing purpose.
+ * It should be large enough for forcing {@link #position(long, long)}
+ * to skip bytes instead of reading them.
+ */
+ private final int length;
+
+ /**
+ * Generator of random numbers for controlling the behavior of this
channel.
+ */
+ private final Random random;
+
+ /**
+ * Creates a new test channel.
+ *
+ * @param test a name to use for identifying the test in error
messages.
+ * @param rg generator of random numbers for controlling the
behavior of this channel.
+ * @throws IOException if the temporary file can not be created.
+ */
+ Implementation(final String test, final Random rg) throws IOException {
+ super("Test-");
+ name = test;
+ length = SKIP_THRESHOLD * 10 + rg.nextInt(SKIP_THRESHOLD * 10);
+ random = rg;
+ }
+
+ /**
+ * Returns a name to use in error messages.
+ */
+ @Override
+ protected String filename() {
+ return name;
+ }
+
+ /**
+ * Creates an input stream which provides the bytes to read starting
at the specified position.
+ *
+ * @param start position of the first byte to read (inclusive).
+ * @param end position of the last byte to read with the returned
stream (inclusive),
+ * or {@link Long#MAX_VALUE} for end of stream.
+ * @return contains the input stream providing the bytes to read.
+ */
+ @Override
+ protected Connection openConnection(long start, long end) {
+ assertTrue(end >= 0);
+ if (end >= length) end = length - 1;
+ start = Math.max(start - random.nextInt(40), 0);
+ end = Math.min(end + random.nextInt(40), length - 1); //
Inclusive.
+ var input = new ComputedInputStream(Math.toIntExact(start),
Math.toIntExact(end), random);
+ return new Connection(input, start, end, length, true);
+ }
+
+ /**
+ * Marks the given input stream as closed and notify that a new one
can be created.
+ */
+ @Override
+ protected boolean abort(final InputStream input) throws IOException {
+ input.close();
+ return true;
+ }
+
+ /**
+ * Reads the next bytes from the channel and stores them in a random
region of the given buffer.
+ * On return, the buffer position is set on the first byte read and
the buffer limit is set after
+ * the last byte read.
+ *
+ * @param dst the buffer where to store the bytes that are read.
+ * @return {@code true} if bytes have been read, or {@code false} on
EOF.
+ * @throws IOException if an error occurred when reading or writing to
the temporary file.
+ */
+ final boolean readInRandomRegion(final ByteBuffer dst) throws
IOException {
+ int start = random.nextInt(dst.capacity());
+ int end = random.nextInt(dst.capacity());
+ if (start > end) {
+ int t = start;
+ start = end;
+ end = t;
+ }
+ final int n = read(dst.limit(end).position(start));
+ assertEquals("Number of bytes", Math.min(end - start, n), n);
+ assertEquals("Buffer position", start + Math.max(n, 0),
dst.flip().position(start).limit());
+ return n >= 0;
+ }
+ }
+
+ /**
+ * Tests random operations on a stream of computed values.
+ * The bytes values are determined by their position, which allows easy
verifications.
+ *
+ * @throws IOException if an error occurred when reading or writing to the
temporary file.
+ */
+ @Test
+ public void testRandomOperations() throws IOException {
+ final Random random = TestUtilities.createRandomNumberGenerator();
+ final Implementation channel = new Implementation("test", random);
+ final ByteBuffer buffer = ByteBuffer.allocate(random.nextInt(1000) +
1000);
+ int position = 0;
+ for (int i=0; i<10000; i++) {
+ assertTrue(channel.isOpen());
+ assertEquals(position, channel.position());
+ if (random.nextInt(4) == 0) {
+ position = random.nextInt(channel.length - 1);
+ int end = random.nextInt(channel.length - 1);
+ if (position > end) {
+ int t = position;
+ position = end;
+ end = t;
+ }
+ channel.position(position, end - position + 1);
+ }
+ channel.readInRandomRegion(buffer);
+ while (buffer.hasRemaining()) {
+ assertEquals(ComputedInputStream.valueAt(position++),
buffer.get());
+ }
+ }
+ assertEquals(position, channel.position());
+ channel.close(); // Intentionally no
"try with resource".
+ assertFalse(channel.isOpen());
+ }
+
+ /**
+ * Tests the constructor that parse HTTP ranges.
+ *
+ * @see FileCacheByteChannel.Connection#Connection(InputStream, String,
Iterable, OptionalLong)
+ */
+ @Test
+ public void testParseRange() {
+ FileCacheByteChannel.Connection c;
+ c = new FileCacheByteChannel.Connection(null, "bytes
25000-75000/100000", List.of("bytes"), OptionalLong.empty());
+ assertEquals( 25000, c.start);
+ assertEquals( 75000, c.end);
+ assertEquals(100000, c.length);
+
+ c = new FileCacheByteChannel.Connection(null, "bytes 25000-75000",
List.of("bytes"), OptionalLong.empty());
+ assertEquals( 25000, c.start);
+ assertEquals( 75000, c.end);
+ assertEquals( -1, c.length);
+
+ c = new FileCacheByteChannel.Connection(null, "bytes 25000/100000",
List.of("bytes"), OptionalLong.empty());
+ assertEquals( 25000, c.start);
+ assertEquals(100000, c.end);
+ assertEquals(100000, c.length);
+
+ // Not legal, but we test robustness.
+ c = new FileCacheByteChannel.Connection(null, "25000",
List.of("bytes"), OptionalLong.empty());
+ assertEquals( 25000, c.start);
+ assertEquals( -1, c.end);
+ assertEquals( -1, c.length);
+ }
+}
diff --git
a/storage/sis-storage/src/test/java/org/apache/sis/test/suite/StorageTestSuite.java
b/storage/sis-storage/src/test/java/org/apache/sis/test/suite/StorageTestSuite.java
index a669c14342..70dc7d492c 100644
---
a/storage/sis-storage/src/test/java/org/apache/sis/test/suite/StorageTestSuite.java
+++
b/storage/sis-storage/src/test/java/org/apache/sis/test/suite/StorageTestSuite.java
@@ -26,7 +26,7 @@ import org.junit.BeforeClass;
*
* @author Martin Desruisseaux (Geomatys)
* @author Alexis Manin (Geomatys)
- * @version 1.3
+ * @version 1.4
* @since 0.3
* @module
*/
@@ -40,6 +40,7 @@ import org.junit.BeforeClass;
org.apache.sis.internal.storage.io.ChannelImageOutputStreamTest.class,
org.apache.sis.internal.storage.io.HyperRectangleReaderTest.class,
org.apache.sis.internal.storage.io.RewindableLineReaderTest.class,
+ org.apache.sis.internal.storage.io.FileCacheByteChannelTest.class,
org.apache.sis.internal.storage.MetadataBuilderTest.class,
org.apache.sis.internal.storage.RangeArgumentTest.class,
org.apache.sis.internal.storage.MemoryGridResourceTest.class,