This is an automated email from the ASF dual-hosted git repository. desruisseaux pushed a commit to branch geoapi-4.0 in repository https://gitbox.apache.org/repos/asf/sis.git
The following commit(s) were added to refs/heads/geoapi-4.0 by this push: new 35a3827ce2 Move `CachedByteChannel` implementation from S3 module to `sis-storage` for sharing the code with other protocols requirying cache (e.g. HTTP). This new implementation tries to download only needed ranges of bytes. 35a3827ce2 is described below commit 35a3827ce2684841229f6e32fed4f99d62b56dc3 Author: Martin Desruisseaux <martin.desruisse...@geomatys.com> AuthorDate: Thu Dec 15 15:58:13 2022 +0100 Move `CachedByteChannel` implementation from S3 module to `sis-storage` for sharing the code with other protocols requirying cache (e.g. HTTP). This new implementation tries to download only needed ranges of bytes. --- cloud/pom.xml | 4 +- cloud/sis-cloud-aws/pom.xml | 2 +- .../apache/sis/cloud/aws/s3/CachedByteChannel.java | 293 ++-------- .../org/apache/sis/cloud/aws/s3/FileService.java | 32 +- .../java/org/apache/sis/internal/jdk17/JDK17.java | 19 + .../java/org/apache/sis/internal/jdk17/Record.java | 35 ++ .../org/apache/sis/util/collection/RangeSet.java | 61 +- .../apache/sis/util/collection/package-info.java | 2 +- pom.xml | 2 +- .../sis/internal/storage/io/ChannelData.java | 3 +- .../sis/internal/storage/io/ChannelDataInput.java | 5 +- .../internal/storage/io/FileCacheByteChannel.java | 637 +++++++++++++++++++++ .../internal/storage/io/ComputedInputStream.java | 149 +++++ .../storage/io/FileCacheByteChannelTest.java | 199 +++++++ .../apache/sis/test/suite/StorageTestSuite.java | 3 +- 15 files changed, 1160 insertions(+), 286 deletions(-) diff --git a/cloud/pom.xml b/cloud/pom.xml index 3a2959c1ce..4062f17c4b 100644 --- a/cloud/pom.xml +++ b/cloud/pom.xml @@ -92,8 +92,8 @@ =========================================================== --> <dependencies> <dependency> - <groupId>org.apache.sis.core</groupId> - <artifactId>sis-utility</artifactId> + <groupId>org.apache.sis.storage</groupId> + <artifactId>sis-storage</artifactId> <version>${project.version}</version> </dependency> diff --git a/cloud/sis-cloud-aws/pom.xml b/cloud/sis-cloud-aws/pom.xml index 3daa4a0340..0ec1620512 100644 --- a/cloud/sis-cloud-aws/pom.xml +++ b/cloud/sis-cloud-aws/pom.xml @@ -90,7 +90,7 @@ <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>s3</artifactId> - <version>2.18.25</version> + <version>2.18.40</version> </dependency> </dependencies> diff --git a/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java b/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java index 8ca227467a..135e2ba08f 100644 --- a/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java +++ b/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java @@ -16,272 +16,105 @@ */ package org.apache.sis.cloud.aws.s3; -import java.io.EOFException; +import java.util.List; +import java.util.OptionalLong; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.file.Files; -import java.nio.file.StandardOpenOption; -import java.nio.channels.FileChannel; -import java.nio.channels.SeekableByteChannel; -import org.apache.sis.internal.util.Strings; -import org.apache.sis.util.ArgumentChecks; -import org.apache.sis.util.resources.Errors; -import software.amazon.awssdk.core.ResponseInputStream; +import java.io.InputStream; +import org.apache.sis.internal.storage.io.FileCacheByteChannel; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.http.Abortable; /** * A seekable byte channel which copies S3 data to a temporary file for caching purposes. * * @author Martin Desruisseaux (Geomatys) - * @version 1.2 + * @version 1.4 * @since 1.2 * @module */ -final class CachedByteChannel implements SeekableByteChannel { - /** - * Size of the transfer buffer, in number of bytes. - */ - private static final int BUFFER_SIZE = 8192; - - /** - * The input stream from which to read data. - */ - private final ResponseInputStream<GetObjectResponse> input; - - /** - * The file where data are copied. - */ - private final FileChannel file; - - /** - * A temporary buffer for transferring data when we - * cannot write directly in the destination buffer. - */ - private ByteBuffer transfer; - - /** - * Current position of this channel. - */ - private long position; - - /** - * Number of bytes in the temporary file. - * - * In current implementation this value shall be identical to {@code file.position()}. - * However, in a future implementation it will become different if we allow some parts - * of the file to be without data (sparse file), with data fetched using HTTP ranges. - */ - private long validLength; - +final class CachedByteChannel extends FileCacheByteChannel { /** - * Creates a new channel. + * Path to the S3 file to open. */ - CachedByteChannel(final ResponseInputStream<GetObjectResponse> stream) throws IOException { - input = stream; - file = FileChannel.open(Files.createTempFile("S3-", null), - StandardOpenOption.READ, StandardOpenOption.WRITE, - StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.DELETE_ON_CLOSE); - } + private final KeyPath path; /** - * Attempts to read up to <i>r</i> bytes from the channel, - * where <i>r</i> is the number of bytes remaining in the buffer. - * Bytes are written in the given buffer starting at the current position. - * Upon return, the buffer position is advanced by the number of bytes read. + * Creates a new channel for the S3 file identified by the given path. + * The connection will be opened when first needed. * - * @return number of bytes read, or -1 if end of file. - */ - @Override - public synchronized int read(final ByteBuffer dst) throws IOException { - /* - * If the channel position is before the end of cached data (i.e. a backward seek has been done before), - * use those data without downloading more data from the input stream. Maybe available data are enough. - */ - if (position < validLength) { - final int limit = dst.limit(); - final int start = dst.position(); - final int end = (int) Math.min(limit, start + (validLength - position)); - final int count; - dst.limit(end); - try { - count = file.read(dst, position); - position += count; - } finally { - dst.limit(limit); - } - return count; - } - /* - * At this point we need to download data from the input stream. - * Get a buffer that we can use with `InputStream.read(byte[])`. - * It must be a buffer backed by a Java array. - */ - final ByteBuffer buffer; - if (dst.hasArray()) { - buffer = dst; - } else { - if (transfer == null) { - transfer = ByteBuffer.allocate(BUFFER_SIZE); - } - buffer = transfer; - buffer.clear(); - buffer.limit(dst.remaining()); - } - /* - * Transfer bytes from the input stream to the buffer. - * The bytes are also copied to the temporary file. - */ - final int limit = buffer.limit(); - final int start = buffer.position(); - final int count = input.read(buffer.array(), buffer.arrayOffset() + start, limit - start); - if (count > 0) { - buffer.limit(start + count); - try { - cache(buffer); - } finally { - buffer.limit(limit); - } - /* - * If we used a temporary buffer, transfer to the destination buffer. - */ - if (buffer != dst) { - buffer.flip(); - dst.put(buffer); - } - position += count; - } - return count; - } - - /** - * Writes fully the given buffer in the cache {@linkplain #file}. - * The data to write starts at current buffer position and stops at the buffer limit. - */ - private void cache(final ByteBuffer buffer) throws IOException { - do { - if (file.write(buffer) == 0) { - // Should never happen, but check anyway as a safety against never-ending loop. - throw new IOException(); - } - validLength = file.position(); - } while (buffer.hasRemaining()); - } - - /** - * Attempts to write up to <i>r</i> bytes to the channel, - * where <i>r</i> is the number of bytes remaining in the buffer. - * Bytes are read from the given buffer starting at the current position. - * Upon return, the buffer position is advanced by the number of bytes written. + * @param path path to the S3 file to open. + * @throws IOException if the temporary file can not be created. */ - @Override - public int write(final ByteBuffer src) throws IOException { - throw new IOException("Not supported yet."); + CachedByteChannel(final KeyPath path) throws IOException { + super("S3-"); + this.path = path; } /** - * Returns this channel's position. + * Returns the filename to use in error messages. */ @Override - public synchronized long position() { - return position; + protected String filename() { + return path.getFileName().toString(); } /** - * Sets this channel's position. + * Creates an input stream which provides the bytes to read starting at the specified position. * - * @param newPosition number of bytes from the beginning to the desired position. - * @return {@code this} for method call chaining. - * @throws IOException if an I/O error occurs. + * @param start position of the first byte to read (inclusive). + * @param end position of the last byte to read with the returned stream (inclusive), + * or {@link Long#MAX_VALUE} for end of stream. + * @return contains the input stream providing the bytes to read starting at the given start position. */ @Override - public synchronized SeekableByteChannel position(final long newPosition) throws IOException { - ArgumentChecks.ensurePositive("newPosition", newPosition); - long remaining = newPosition - validLength; - if (remaining > 0) { - if (transfer == null) { - transfer = ByteBuffer.allocate(BUFFER_SIZE); - } - final ByteBuffer buffer = transfer; - do { - buffer.clear(); - if (remaining < BUFFER_SIZE) { - buffer.limit((int) remaining); - } - final int count = input.read(buffer.array(), 0, buffer.limit()); - if (count <= 0) { - final Long size = input.response().contentLength(); - throw new EOFException(Errors.format(Errors.Keys.ValueOutOfRange_4, "newPosition", 0, size, newPosition)); + protected Connection openConnection(long start, long end) throws IOException { + final ResponseInputStream<GetObjectResponse> stream; + final String contentRange, acceptRanges; + final Long contentLength; + try { + GetObjectRequest.Builder builder = GetObjectRequest.builder().bucket(path.bucket).key(path.key); + final boolean hasEnd = (end > start) && (end != Long.MAX_VALUE); + if (start != 0 || hasEnd) { + final StringBuilder range = new StringBuilder(RANGES_UNIT).append('=').append(start); + if (hasEnd) { + range.append('-').append(end); // Inclusive. } - buffer.limit(count); - cache(buffer); - remaining -= count; - } while (remaining > 0); + builder = builder.range(range.toString()); + } + stream = path.fs.client().getObject(builder.build()); + final GetObjectResponse response = stream.response(); + contentLength = response.contentLength(); + contentRange = response.contentRange(); + acceptRanges = response.acceptRanges(); + } catch (SdkException e) { + throw FileService.failure(path, e); } - position = newPosition; - return this; - } - - /** - * Returns the size of the S3 file. - * - * @return number of bytes in the file. - * @throws IOException if the information is not available. - */ - @Override - public long size() throws IOException { - final Long size = input.response().contentLength(); - if (size != null) return size; - throw new IOException(); - } - - /** - * Truncates the file to the given size. - * - * @param size the new size in bytes. - * @throws IOException if the operation is not supported. - */ - @Override - public SeekableByteChannel truncate(long size) throws IOException { - throw new IOException("Not supported yet."); - } - - /** - * Tells whether this channel is open. - * - * @return {@code true} if this channel is open. - */ - @Override - public boolean isOpen() { // No synchronization, rely on `FileChannel` thread safety instead. - return file.isOpen(); + final List<String> arl = (acceptRanges != null) ? List.of(acceptRanges) : List.of(); + if (contentRange == null) { + final long length = (contentLength != null) ? contentLength : -1; + return new Connection(stream, 0, (length < 0) ? Long.MAX_VALUE : length, length, Connection.acceptRanges(arl)); + } + return new Connection(stream, contentRange, arl, + (contentLength != null) ? OptionalLong.of(contentLength) : OptionalLong.empty()); } /** - * Closes this channel and releases resources. + * Invoked when this channel is no longer interested in reading bytes from the specified stream. * - * @throws IOException if an error occurred while closing the channel. + * @param input the input stream to eventually close. + * @return whether the given input stream has been closed by this method. */ @Override - public synchronized void close() throws IOException { - transfer = null; - try { - file.close(); - } catch (Throwable e) { - try { - input.close(); - } catch (Throwable s) { - e.addSuppressed(s); - } - throw e; + protected boolean abort(final InputStream input) throws IOException { + if (input instanceof Abortable) { + ((Abortable) input).abort(); + return true; + } else { + return super.abort(input); } - input.close(); - } - - /** - * Returns a string representation for debugging purpose. - */ - @Override - public String toString() { - return Strings.toString(getClass(), "position", position, "validLength", validLength); } } diff --git a/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/FileService.java b/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/FileService.java index 9b44d65de7..15cbb192a6 100644 --- a/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/FileService.java +++ b/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/FileService.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentHashMap; import java.net.URI; import java.io.IOException; +import java.io.InputStream; import java.nio.file.Path; import java.nio.file.FileStore; import java.nio.file.FileSystem; @@ -455,20 +456,17 @@ public class FileService extends FileSystemProvider { * @throws UnsupportedOperationException if an unsupported option is specified. * @throws IOException if an I/O error occurs. */ - public ResponseInputStream<GetObjectResponse> newInputStream(final Path path, final OpenOption... options) throws IOException { + @Override + public InputStream newInputStream(final Path path, final OpenOption... options) throws IOException { + ensureSupported(options); final KeyPath kp = toAbsolute(path, true); - for (final OpenOption opt: options) { - if (opt == StandardOpenOption.APPEND || opt == StandardOpenOption.WRITE) { - throw new UnsupportedOperationException(Errors.format(Errors.Keys.UnsupportedArgumentValue_1, opt)); - } - } + final ResponseInputStream<GetObjectResponse> stream; try { - final ResponseInputStream<GetObjectResponse> stream = kp.fs.client().getObject( - GetObjectRequest.builder().bucket(kp.bucket).key(kp.key).build()); - return stream; + stream = kp.fs.client().getObject(GetObjectRequest.builder().bucket(kp.bucket).key(kp.key).build()); } catch (SdkException e) { throw failure(path, e); } + return stream; } /** @@ -485,9 +483,19 @@ public class FileService extends FileSystemProvider { public SeekableByteChannel newByteChannel(final Path path, final Set<? extends OpenOption> options, final FileAttribute<?>... attributes) throws IOException { - final ResponseInputStream<GetObjectResponse> stream = - newInputStream(path, options.toArray(new OpenOption[options.size()])); - return new CachedByteChannel(stream); + ensureSupported(options.toArray(OpenOption[]::new)); + return new CachedByteChannel(toAbsolute(path, true)); + } + + /** + * Ensures that the given array of options does not contain an unsupported option. + */ + private static void ensureSupported(final OpenOption[] options) { + for (final OpenOption opt : options) { + if (opt == StandardOpenOption.APPEND || opt == StandardOpenOption.WRITE) { + throw new UnsupportedOperationException(Errors.format(Errors.Keys.UnsupportedArgumentValue_1, opt)); + } + } } /** diff --git a/core/sis-utility/src/main/java/org/apache/sis/internal/jdk17/JDK17.java b/core/sis-utility/src/main/java/org/apache/sis/internal/jdk17/JDK17.java index 9578b2ac21..6154d3ac4f 100644 --- a/core/sis-utility/src/main/java/org/apache/sis/internal/jdk17/JDK17.java +++ b/core/sis-utility/src/main/java/org/apache/sis/internal/jdk17/JDK17.java @@ -16,6 +16,9 @@ */ package org.apache.sis.internal.jdk17; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.List; import java.util.stream.Stream; @@ -63,6 +66,22 @@ public final class JDK17 { } } + /** + * Place holder for {@link InputStream#skipNBytes(long)} method added in JDK12. + */ + public static void skipNBytes(final InputStream s, long n) throws IOException { + while (n > 0) { + long c = s.skip(n); + if (c <= 0) { + if (c < 0 || s.read() < 0) { + throw new EOFException(); + } + c = 1; + } + n -= c; + } + } + /** * Place holder for {@link Stream#toList()} method added in JDK16. */ diff --git a/core/sis-utility/src/main/java/org/apache/sis/internal/jdk17/Record.java b/core/sis-utility/src/main/java/org/apache/sis/internal/jdk17/Record.java new file mode 100644 index 0000000000..83bbe0e806 --- /dev/null +++ b/core/sis-utility/src/main/java/org/apache/sis/internal/jdk17/Record.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sis.internal.jdk17; + + +/** + * Placeholder for the {@code java.lang.Record} class introduced in Java 16. + * This is used for making transition easier when SIS will be ready to upgrade to Java 17. + * + * @author Martin Desruisseaux (Geomatys) + * @since 1.4 + * @version 1.4 + * @module + */ +public abstract class Record { + /** + * Creates a new record. + */ + protected Record() { + } +} diff --git a/core/sis-utility/src/main/java/org/apache/sis/util/collection/RangeSet.java b/core/sis-utility/src/main/java/org/apache/sis/util/collection/RangeSet.java index 5ce8e281ce..b1e07b8de9 100644 --- a/core/sis-utility/src/main/java/org/apache/sis/util/collection/RangeSet.java +++ b/core/sis-utility/src/main/java/org/apache/sis/util/collection/RangeSet.java @@ -23,6 +23,7 @@ import java.io.ObjectOutputStream; import java.io.ObjectStreamException; import java.lang.reflect.Array; import java.util.Arrays; +import java.util.Objects; import java.util.Iterator; import java.util.Comparator; import java.util.SortedSet; @@ -101,7 +102,7 @@ import static org.apache.sis.util.Numbers.*; * * @author Martin Desruisseaux (Geomatys) * @author Rémi Maréchal (Geomatys) - * @version 0.5 + * @version 1.4 * * @param <E> the type of range elements. * @@ -435,6 +436,9 @@ public class RangeSet<E extends Comparable<? super E>> extends AbstractSet<Range * @param upper index after the last value to examine. */ final int binarySearch(final E value, final int lower, final int upper) { + if (array == null || value == null) { + return -1; + } switch (elementCode) { case DOUBLE: return Arrays.binarySearch((double[]) array, lower, upper, (Double) value); case FLOAT: return Arrays.binarySearch((float[]) array, lower, upper, (Float) value); @@ -526,7 +530,7 @@ public class RangeSet<E extends Comparable<? super E>> extends AbstractSet<Range * is not present neither in this set (i1 < 0) and if its insertion point is the * same, then insert the new range in the space between two existing ranges. */ - if (i0 == ~i1) { // Includes the (i0 == length) case. + if (i0 == ~i1) { // Includes the (i0 == length) case. ensureOrdered(minValue, maxValue); insertAt(i0, minValue, maxValue); return true; @@ -551,9 +555,9 @@ public class RangeSet<E extends Comparable<? super E>> extends AbstractSet<Range * minValue: │ (insertion point i0 == 3) * moving i0: ├────┘ */ - i0 &= ~1; // Equivalent to executing i0-- only when i0 is odd. + i0 &= ~1; // Equivalent to executing i0-- only when i0 is odd. if (i1 < 0) { - i1 = ~i1; // Really tild operator, not minus sign. + i1 = ~i1; // Really tild operator, not minus sign. if ((i1 & 1) == 0) { /* * If the "insertion point" is outside any existing range, expand the previous @@ -646,7 +650,7 @@ public class RangeSet<E extends Comparable<? super E>> extends AbstractSet<Range public boolean remove(final E minValue, final E maxValue) throws IllegalArgumentException { ArgumentChecks.ensureNonNull("minValue", minValue); ArgumentChecks.ensureNonNull("maxValue", maxValue); - if (length == 0) return false; // Nothing to do if no data. + if (length == 0) return false; // Nothing to do if no data. ensureOrdered(minValue, maxValue); // Search insertion index. @@ -1045,7 +1049,7 @@ public class RangeSet<E extends Comparable<? super E>> extends AbstractSet<Range if (lower < 0) { lower = ~lower; } - lower &= ~1; // Force the index to even value. + lower &= ~1; // Force the index to even value. } if (maxValue != null) { upper = binarySearch(maxValue, lower, upper); @@ -1053,8 +1057,8 @@ public class RangeSet<E extends Comparable<? super E>> extends AbstractSet<Range upper = ~upper; } /* - * If 'upper' is even (i.e. is the index of a minimal value), keep that index - * unchanged because this value is exclusive. But if 'upper' is odd (i.e. is + * If `upper` is even (i.e. is the index of a minimal value), keep that index + * unchanged because this value is exclusive. But if `upper` is odd (i.e. is * the index of a maximal value), move to the minimal value of the next range. */ upper = (upper + 1) & ~1; @@ -1398,7 +1402,7 @@ public class RangeSet<E extends Comparable<? super E>> extends AbstractSet<Range * Found an insertion point. Make sure that the insertion * point is inside a range (i.e. before the maximum value). */ - index = ~index; // Tild sign, not minus. + index = ~index; // Tild sign, not minus. if ((index & 1) == 0) { return -1; } @@ -1406,8 +1410,7 @@ public class RangeSet<E extends Comparable<? super E>> extends AbstractSet<Range // The value is equal to an excluded endpoint. return -1; } - index /= 2; // Round toward 0 (odd index are maximum values). - return index; + return index >>> 1; // Round toward 0 (odd index are maximum values). } /** @@ -1417,15 +1420,12 @@ public class RangeSet<E extends Comparable<? super E>> extends AbstractSet<Range * Widening conversions are performed as needed. * * @param index the range index, from 0 inclusive to {@link #size() size} exclusive. - * @return the minimum value for the range at the specified index, inclusive. + * @return the minimum value for the range at the specified index. * @throws IndexOutOfBoundsException if {@code index} is out of bounds. * @throws ClassCastException if range elements are not convertible to {@code long}. */ - public long getMinLong(int index) throws IndexOutOfBoundsException, ClassCastException { - if ((index *= 2) >= length) { - throw new IndexOutOfBoundsException(); - } - return Array.getLong(array, index); + public long getMinLong(final int index) throws IndexOutOfBoundsException, ClassCastException { + return Array.getLong(array, Objects.checkIndex(index, length >>> 1) << 1); } /** @@ -1435,17 +1435,14 @@ public class RangeSet<E extends Comparable<? super E>> extends AbstractSet<Range * Widening conversions are performed as needed. * * @param index the range index, from 0 inclusive to {@link #size() size} exclusive. - * @return the minimum value for the range at the specified index, inclusive. + * @return the minimum value for the range at the specified index. * @throws IndexOutOfBoundsException if {@code index} is out of bounds. * @throws ClassCastException if range elements are not convertible to numbers. * * @see org.apache.sis.measure.NumberRange#getMinDouble() */ - public double getMinDouble(int index) throws IndexOutOfBoundsException, ClassCastException { - if ((index *= 2) >= length) { - throw new IndexOutOfBoundsException(); - } - return Array.getDouble(array, index); + public double getMinDouble(final int index) throws IndexOutOfBoundsException, ClassCastException { + return Array.getDouble(array, Objects.checkIndex(index, length >>> 1) << 1); } /** @@ -1455,15 +1452,12 @@ public class RangeSet<E extends Comparable<? super E>> extends AbstractSet<Range * Widening conversions are performed as needed. * * @param index the range index, from 0 inclusive to {@link #size() size} exclusive. - * @return the maximum value for the range at the specified index, inclusive. + * @return the maximum value for the range at the specified index. * @throws IndexOutOfBoundsException if {@code index} is out of bounds. * @throws ClassCastException if range elements are not convertible to {@code long}. */ - public long getMaxLong(int index) throws IndexOutOfBoundsException, ClassCastException { - if ((index *= 2) >= length) { - throw new IndexOutOfBoundsException(); - } - return Array.getLong(array, index + 1); + public long getMaxLong(final int index) throws IndexOutOfBoundsException, ClassCastException { + return Array.getLong(array, (Objects.checkIndex(index, length >>> 1) << 1) | 1); } /** @@ -1473,17 +1467,14 @@ public class RangeSet<E extends Comparable<? super E>> extends AbstractSet<Range * Widening conversions are performed as needed. * * @param index the range index, from 0 inclusive to {@link #size size} exclusive. - * @return the maximum value for the range at the specified index, exclusive. + * @return the maximum value for the range at the specified index. * @throws IndexOutOfBoundsException if {@code index} is out of bounds. * @throws ClassCastException if range elements are not convertible to numbers. * * @see org.apache.sis.measure.NumberRange#getMaxDouble() */ - public double getMaxDouble(int index) throws IndexOutOfBoundsException, ClassCastException { - if ((index *= 2) >= length) { - throw new IndexOutOfBoundsException(); - } - return Array.getDouble(array, index + 1); + public double getMaxDouble(final int index) throws IndexOutOfBoundsException, ClassCastException { + return Array.getDouble(array, (Objects.checkIndex(index, length >>> 1) << 1) | 1); } /** diff --git a/core/sis-utility/src/main/java/org/apache/sis/util/collection/package-info.java b/core/sis-utility/src/main/java/org/apache/sis/util/collection/package-info.java index 1dcb2fc8d2..7b25905c19 100644 --- a/core/sis-utility/src/main/java/org/apache/sis/util/collection/package-info.java +++ b/core/sis-utility/src/main/java/org/apache/sis/util/collection/package-info.java @@ -51,7 +51,7 @@ * </ul> * * @author Martin Desruisseaux (IRD, Geomatys) - * @version 1.3 + * @version 1.4 * @since 0.3 * @module */ diff --git a/pom.xml b/pom.xml index 3e85c4c59f..1d1a67e75c 100644 --- a/pom.xml +++ b/pom.xml @@ -39,7 +39,7 @@ <parent> <groupId>org.apache</groupId> <artifactId>apache</artifactId> - <version>28</version> + <version>29</version> </parent> diff --git a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelData.java b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelData.java index cc4150ea86..73cd926b3e 100644 --- a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelData.java +++ b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelData.java @@ -327,7 +327,8 @@ public abstract class ChannelData implements Markable { } /** - * Moves to the given position in the stream, relative to the stream position at construction time. + * Moves to the given position in the stream. The given position is relative to + * the position that the stream had at {@code ChannelData} construction time. * * @param position the position where to move. * @throws IOException if the stream cannot be moved to the given position. diff --git a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelDataInput.java b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelDataInput.java index 99e54c8be8..45802f35ff 100644 --- a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelDataInput.java +++ b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelDataInput.java @@ -60,7 +60,7 @@ import static org.apache.sis.util.ArgumentChecks.ensureBetween; * {@link javax.imageio} is needed. * * @author Martin Desruisseaux (Geomatys) - * @version 1.3 + * @version 1.4 * @since 0.3 * @module */ @@ -899,7 +899,8 @@ public class ChannelDataInput extends ChannelData { } /** - * Moves to the given position in the stream, relative to the stream position at construction time. + * Moves to the given position in the stream. The given position is relative to + * the position that the stream had at {@code ChannelDataInput} construction time. * * @param position the position where to move. * @throws IOException if the stream cannot be moved to the given position. diff --git a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java new file mode 100644 index 0000000000..d118eba197 --- /dev/null +++ b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java @@ -0,0 +1,637 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sis.internal.storage.io; + +import java.util.OptionalLong; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.nio.channels.FileChannel; +import java.nio.channels.SeekableByteChannel; +import java.nio.channels.NonWritableChannelException; +import org.apache.sis.internal.storage.Resources; +import org.apache.sis.internal.util.Strings; +import org.apache.sis.util.ArgumentChecks; +import org.apache.sis.util.ArraysExt; +import org.apache.sis.util.CharSequences; +import org.apache.sis.util.resources.Errors; +import org.apache.sis.util.collection.RangeSet; + + +/** + * A seekable byte channel which copies data from an input stream to a temporary file. + * This class can be used for wrapping HTTP or S3 connections for use with {@link ChannelDataInput}. + * Characteristics: + * + * <ul> + * <li>Bytes read from the input stream are cached in a temporary file for making backward seeks possible.</li> + * <li>The number of bytes of interest {@linkplain #position(long, long) can be specified}. + * It makes possible to specify the range of bytes to download with HTTP connections.</li> + * <li>This implementation is thread-safe.</li> + * <li>Current implementation is read-only.</li> + * </ul> + * + * @author Martin Desruisseaux (Geomatys) + * @version 1.4 + * @since 1.4 + * @module + */ +public abstract class FileCacheByteChannel implements SeekableByteChannel { + /** + * Size of the transfer buffer, in number of bytes. + */ + private static final int BUFFER_SIZE = 8 * 1024; + + /** + * Threshold for implementing a change of position by closing current connection and opening a new one. + * If the number of bytes to skip is smaller than this threshold, then we will rather continue reading + * with the current input stream. + * + * <p>For an average download speed of 25 Mb/s, downloading 64 kB requires about 1/50 of second.</p> + */ + static final int SKIP_THRESHOLD = 64 * 1024; + + /** + * The unit of ranges used in HTTP connections. + */ + protected static final String RANGES_UNIT = "bytes"; + + /** + * Information about an input stream and its range of bytes. + * This is the return value of {@link #openConnection(long, long)}. + */ + protected static final class Connection extends org.apache.sis.internal.jdk17.Record { + /** The input stream for reading the bytes. */ + final InputStream input; + + /** Position of the first byte read by the input stream (inclusive). */ + final long start; + + /** Position of the last byte read by the input stream (inclusive). */ + final long end; + + /** Total length of the stream, or -1 is unknown. */ + final long length; + + /** Whether connection can be created for ranges of bytes. */ + final boolean acceptRanges; + + /** + * Creates information about a connection. + * + * @param input the input stream for reading the bytes. + * @param start position of the first byte read by the input stream (inclusive). + * @param end position of the last byte read by the input stream (inclusive). + * @param length total length of the stream, or -1 is unknown. + * @param acceptRanges whether connection can be created for ranges of bytes. + * + * @see #openConnection(long, long) + */ + public Connection(final InputStream input, final long start, final long end, final long length, final boolean acceptRanges) { + this.input = input; + this.start = start; + this.end = end; + this.length = length; + this.acceptRanges = acceptRanges; + } + + /** + * Creates information about a connection by parsing HTTP header. + * Example: "Content-Range: bytes 25000-75000/100000". + * + * @param input the input stream for reading the bytes. + * @param contentRange value of "Content-Range" in HTTP header. + * @param acceptRanges value of "Accept-Ranges" in HTTP header. + * @param contentLength total length of the stream. + * @throws IllegalArgumentException if the start, end of length cannot be parsed. + */ + public Connection(final InputStream input, String contentRange, final Iterable<String> acceptRanges, + final OptionalLong contentLength) + { + this.input = input; + contentRange = contentRange.trim(); + int s = contentRange.indexOf(' '); + if (s >= 0 && (s != RANGES_UNIT.length() || !contentRange.regionMatches(true, 0, RANGES_UNIT, 0, s))) { + throw new IllegalArgumentException(Errors.format(Errors.Keys.UnsupportedArgumentValue_1, contentRange)); + } + int rs = contentRange.indexOf('-', ++s); // Index of range separator. + int ls = contentRange.indexOf('/', Math.max(s, rs+1)); // Index of length separator. + if (contentLength.isPresent()) { + length = contentLength.getAsLong(); + } else if (ls >= 0) { + String t = contentRange.substring(ls+1).trim(); + length = t.equals("*") ? -1 : Long.parseLong(t); + } else { + length = -1; + } + if (ls < 0) ls = contentRange.length(); + if (rs < 0) rs = ls; + start = Long.parseLong(contentRange.substring(s, rs).trim()); + end = (rs < ls) ? Long.parseLong(contentRange.substring(rs+1, ls).trim()) : length; + this.acceptRanges = acceptRanges(acceptRanges); + } + + /** + * Returns {@code true} if the given "Accept-Ranges" values contains at least one "bytes" string. + * + * @param values HTTP header value for "Accept-Ranges". + * @return whether the values contains at least one "bytes" string. + */ + public static boolean acceptRanges(final Iterable<String> values) { + for (final String t : values) { + if (ArraysExt.containsIgnoreCase((String[]) CharSequences.split(t, ','), RANGES_UNIT)) { + return true; + } + } + return false; + } + + /** + * Returns a string representation for debugging purposes. + */ + @Override + public String toString() { + return Strings.toString(getClass(), "start", start, "end", end); + } + } + + /** + * The source of bytes to read, or {@code null} if the connection has not yet been established. + * The stream can be closed and replaced by another stream if various connections are opened + * for downloading various ranges of bytes. When a new stream is created, the position of the + * {@linkplain #file} channel shall be synchronized with the input stream position (taking in + * account the start of the range). + * + * @see #openConnection() + * @see #openConnection(long, long) + * @see #abort(InputStream) + */ + private Connection connection; + + /** + * Input/output channel on the temporary or cached file where data are copied. + * The {@linkplain FileChannel#position() position of this file channel} shall + * be the current position of the {@linkplain Connection#input input stream}. + * + * <h4>Space consumption</h4> + * This channel should be opened on a sparse file. + * To check if a file is sparse, the outputs of following shell commands can be compared: + * + * {@preformat shell + * ls -l the-temporary-file + * du --block-size=1 the-temporary-file + * } + * + * @see <a href="https://en.wikipedia.org/wiki/Sparse_file">Sparse file on Wikipedia</a> + */ + private final FileChannel file; + + /** + * A temporary buffer for transferring data when we cannot write directly in the destination buffer. + * It shall be a buffer backed by a Java array, not a direct buffer. Created when first needed. + */ + private ByteBuffer transfer; + + /** + * Current position of this channel. The first byte of this channel is always at position zero. + * + * @see #position() + */ + private long position; + + /** + * Position after the last requested byte, or ≤ {@linkplain #position} if unknown. + * It can be used for specifying the range of bytes to download from an HTTP connection. + * + * @see #position(long, long) + */ + private long endOfInterest; + + /** + * Ranges of bytes in the {@linkplain #file} where data are valid. + */ + private final RangeSet<Long> rangesOfAvailableBytes; + + /** + * Number of bytes in the full stream, or 0 if not yet computed. + */ + private long length; + + /** + * Creates a new channel which will cache bytes in a temporary file. + * The source of bytes will be provided by {@link #openConnection(long, long)}. + * + * @param prefix prefix of the temporary file to create. + * @throws IOException if the temporary file can not be created. + */ + protected FileCacheByteChannel(final String prefix) throws IOException { + rangesOfAvailableBytes = RangeSet.create(Long.class, true, false); + file = FileChannel.open(Files.createTempFile(prefix, null), + StandardOpenOption.READ, + StandardOpenOption.WRITE, + StandardOpenOption.SPARSE, + StandardOpenOption.TRUNCATE_EXISTING, + StandardOpenOption.DELETE_ON_CLOSE); + } + + /** + * Returns the filename to use in error messages. + * + * @return a filename for error messages. + */ + protected abstract String filename(); + + /** + * Creates an input stream which provides the bytes to read starting at the specified position. + * If the caller needs only a sub-range of bytes, then the end of the desired range is specified. + * That end is only a hint and can be ignored. + * + * @param start position of the first byte to read (inclusive). + * @param end position of the last byte to read with the returned stream (inclusive), + * or {@link Long#MAX_VALUE} for end of stream. + * @return information about the input stream providing the bytes to read starting at the given start position. + * @throws IOException if the connection can not be established. + */ + protected abstract Connection openConnection(long start, long end) throws IOException; + + /** + * Invoked when this channel is no longer interested in reading bytes from the specified stream. + * This method is invoked for example when this channel needs to skip an arbitrarily large number + * of bytes because the {@linkplain #position(long) position changed}. The {@code input} argument + * is the value in the record returned by a previous call to {@link #openConnection(long, long)}. + * The boolean return value tells what this method has done: + * + * <ul class="verbose"> + * <li>If this method returns {@code true}, then the given stream has been closed by this method and this + * channel is ready to create a new stream on the next call to {@link #openConnection(long, long)}.</li> + * <li>If this method returns {@code false}, then the given stream is still alive and should continue to be used. + * The {@link #openConnection(long, long)} method will <em>not</em> be invoked. + * Instead, bytes will be skipped by reading them from the current input stream and caching them.</li> + * </ul> + * + * @param input the input stream to eventually close. + * @return whether the given input stream has been closed by this method. If {@code false}, + * then this channel should continue to use that input stream instead of opening a new connection. + * @throws IOException if an error occurred while closing the stream or preparing for next read operations. + */ + protected boolean abort(InputStream input) throws IOException { + return false; + } + + /** + * Returns the number of bytes in the input stream. + * + * @return number of bytes in the input stream. + * @throws IOException if the information is not available. + */ + @Override + public synchronized long size() throws IOException { + return length; + } + + /** + * Returns this channel's position. + * The first byte read by this channel is always at position zero. + * + * @return the current channel position. + */ + @Override + public synchronized long position() { + return position; + } + + /** + * Sets this channel's position. + * + * @param newPosition number of bytes from the beginning to the desired position. + * @return {@code this} for method call chaining. + * @throws IOException if an I/O error occurs. + */ + @Override + public synchronized SeekableByteChannel position(final long newPosition) throws IOException { + ArgumentChecks.ensurePositive("newPosition", newPosition); + position = newPosition; + if (endOfInterest - newPosition < SKIP_THRESHOLD) { + endOfInterest = 0; // Read until end of stream. + } + return this; + } + + /** + * Sets this channel's position together with the number of bytes to read. + * The number of bytes is only a hint and may be ignored, depending on subclasses. + * Reading more bytes than specified is okay, only potentially less efficient. + * + * @param newPosition number of bytes from the beginning to the desired position. + * @param count expected number of bytes to read. + * @throws IOException if an I/O error occurs. + */ + final synchronized void position(final long newPosition, final long count) throws IOException { + ArgumentChecks.ensurePositive("newPosition", newPosition); + ArgumentChecks.ensureStrictlyPositive("count", count); + position = newPosition; + endOfInterest = newPosition + count; // Overflow is okay here (will read until end of stream). + } + + /** + * Opens a connection on the range of bytes determined by the current channel position. + * The {@link #endOfInterest} position is considered unspecified if not greater than + * {@link #position} (it may be 0). + * + * @return the opened connection (never {@code null}). + * @throws IOException if the connection can not be established. + */ + private Connection openConnection() throws IOException { + long end = endOfInterest; + if (end > position) end--; // Make inclusive. + else end = Long.MAX_VALUE; + var c = openConnection(position, end); + file.position(c.start); + if (c.length >= 0) { + length = c.length; + } + connection = c; // Set only on success. + return c; + } + + /** + * Returns a buffer for transferring bytes from the input stream to the cache. + * This buffer must be backed by a Java array (not a direct buffer). + */ + private ByteBuffer transfer() { + if (transfer == null) { + transfer = ByteBuffer.allocate(BUFFER_SIZE); + } + return transfer; + } + + /** + * Tries to move the input stream by skipping the specified amount of bytes. + * This method is invoked when the source of input streams (the server) does not support ranges, + * or when the number of bytes to skip is too small for being worth to create a new connection. + * This method may skip less bytes than requested. The skipped bytes are saved in the cache. + * + * @param count number of bytes to skip. + * @return remaining number of bytes to skip after this method execution. + * @throws IOException if an I/O error occurred. + */ + private long skipInInput(long count) throws IOException { + if (count < 0) { + throw new IOException(Resources.format(Resources.Keys.StreamIsReadOnce_1, filename())); + } else if (count != 0) { + final InputStream input = connection.input; + final ByteBuffer buffer = transfer(); + do { + buffer.clear(); + if (count < BUFFER_SIZE) { + buffer.limit((int) count); + } + int n = input.read(buffer.array(), 0, buffer.limit()); + if (n <= 0) { + if (n != 0 || (n = input.read()) < 0) { // Block until we get one byte. + break; // End of stream, but maybe it was a sub-range. + } + buffer.put((byte) n); + n = 1; + } + cache(buffer.limit(n)); + count -= n; + } while (count > 0); + } + return count; + } + + /** + * Attempts to read up to <i>r</i> bytes from the channel, + * where <i>r</i> is the number of bytes remaining in the buffer. + * Bytes are written in the given buffer starting at the current position. + * Upon return, the buffer position is advanced by the number of bytes read. + * + * @param dst the buffer where to store the bytes that are read. + * @return number of bytes read, or -1 if end of file. + * @throws IOException if an error occurred when reading the bytes. + */ + @Override + public synchronized int read(final ByteBuffer dst) throws IOException { + /* + * If the channel position is inside a range of cached data (i.e. a backward seek has been done before), + * use those data without downloading more data from the input stream. Maybe available data are enough. + */ + int count = readFromCache(dst); + if (count >= 0) { + return count; + } + /* + * If we reach this point, the byte at `position` is not in the cache. + * If a connection exists, we need to either discard it or skip bytes. + */ + Connection c = connection; + long offset = position - file.position(); + if (offset != 0 && c != null) { + if ((offset < 0 || (c.acceptRanges && (offset >= SKIP_THRESHOLD || position > c.end)))) { + offset -= drainAndAbort(); + c = connection; + } + } + /* + * At this point we need to download data from the input stream. + * If previous connection can not be used, open a new one for the range of bytes to read. + * Then skip all bytes between the current position and the requested position. + * Those bytes will be saved in the cache. + */ + if (c == null) { + c = openConnection(); + offset = position - c.start; + } + offset = skipInInput(offset); + if (offset != 0) { + count = readFromCache(dst); + if (count >= 0) { + return count; + } + throw new EOFException(Errors.format(Errors.Keys.ValueOutOfRange_4, "position", 0, length, position)); + } + /* + * Get a buffer that we can use with `InputStream.read(byte[])`. + * It must be a buffer backed by a Java array. + */ + final ByteBuffer buffer; + if (dst.hasArray()) { + buffer = dst; + } else { + buffer = transfer(); + buffer.clear().limit(dst.remaining()); + } + /* + * Transfer bytes from the input stream to the buffer. + * The bytes are also copied to the temporary file. + */ + final int limit = buffer.limit(); + final int start = buffer.position(); + count = c.input.read(buffer.array(), Math.addExact(buffer.arrayOffset(), start), buffer.remaining()); + if (count > 0) { + try { + cache(buffer.limit(start + count)); + } finally { + buffer.limit(limit); + } + if (buffer != dst) { + dst.put(buffer.flip()); // Transfer temporary to destination buffer. + } + position += count; + } + return count; + } + + /** + * Attempts to read up to bytes from the cache. + * + * @param dst the buffer where to store the bytes that are read. + * @return number of bytes read, or -1 if the cache does not contain the requested range of bytes. + * @throws IOException if an error occurred when reading the bytes. + */ + private int readFromCache(final ByteBuffer dst) throws IOException { + final int indexOfRange = rangesOfAvailableBytes.indexOfRange(position); + if (indexOfRange < 0) { + return -1; + } + final long endOfCache = rangesOfAvailableBytes.getMaxLong(indexOfRange); + final int limit = dst.limit(); + final int start = dst.position(); + final int end = (int) Math.min(limit, start + (endOfCache - position)); + final int count; + try { + count = file.read(dst.limit(end), position); + position += count; + } finally { + dst.limit(limit); + } + return count; + } + + /** + * Writes fully the given buffer in the cache {@linkplain #file}. + * The data to write starts at current buffer position and stops at the buffer limit. + * This method changes the {@link Connection#input} and {@link #file} positions by the same amount. + * + * @param buffer buffer containing data to cache. + */ + private void cache(final ByteBuffer buffer) throws IOException { + do { + final long start = file.position(); + final int count = file.write(buffer); + if (count <= 0) { + // Should never happen, but check anyway as a safety against never-ending loop. + throw new IOException(); + } + long end = start + count; + if (end < start) end = Long.MAX_VALUE; // Safety against overflow. + rangesOfAvailableBytes.add(start, end); + } while (buffer.hasRemaining()); + } + + /** + * Reads and caches the bytes that are already available in the input stream, then aborts download. + * This method may set {@link #connection} to null. + * + * @return number of bytes that have been read. + * @throws IOException if an I/O error occurred. + */ + private long drainAndAbort() throws IOException { + long count = 0; + final InputStream input = connection.input; + for (int c; (c = input.available()) > 0;) { + final ByteBuffer buffer = transfer(); + buffer.clear(); + if (c < BUFFER_SIZE) buffer.limit(c); + final int n = input.read(buffer.array(), 0, buffer.limit()); + if (n < 0) break; + cache(buffer.limit(n)); + count += n; + } + if (abort(input)) { + connection = null; + } + return count; + } + + /** + * Attempts to write up to <i>r</i> bytes to the channel, + * where <i>r</i> is the number of bytes remaining in the buffer. + * Bytes are read from the given buffer starting at the current position. + * Upon return, the buffer position is advanced by the number of bytes written. + * + * <p>The default implementation throws {@link IOException}.</p> + * + * @param src the buffer containing the bytes to write. + * @return number of bytes actually written. + * @throws IOException if an error occurred while writing the bytes. + */ + @Override + public int write(final ByteBuffer src) throws IOException { + throw new NonWritableChannelException(); + } + + /** + * Truncates the file to the given size. + * + * @param size the new size in bytes. + * @throws IOException if the operation is not supported. + */ + @Override + public SeekableByteChannel truncate(long size) throws IOException { + throw new NonWritableChannelException(); + } + + /** + * Tells whether this channel is open. + * + * @return {@code true} if this channel is open. + */ + @Override + public boolean isOpen() { // No synchronization, rely on `FileChannel` thread safety instead. + return file.isOpen(); + } + + /** + * Closes this channel and releases resources. + * + * @throws IOException if an error occurred while closing the channel. + */ + @Override + public synchronized void close() throws IOException { + final Connection c = connection; + connection = null; + transfer = null; + try (file) { + if (c != null && !abort(c.input)) { + c.input.close(); + } + } + } + + /** + * Returns a string representation for debugging purpose. + */ + @Override + public String toString() { + return Strings.toString(getClass(), "filename", filename(), "position", position, "rangeCount", rangesOfAvailableBytes.size()); + } +} diff --git a/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/ComputedInputStream.java b/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/ComputedInputStream.java new file mode 100644 index 0000000000..953b357a6f --- /dev/null +++ b/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/ComputedInputStream.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sis.internal.storage.io; + +import java.util.Random; +import java.io.InputStream; + +import static org.junit.Assert.*; + + +/** + * An input stream where each value is computed from the stream position. + * The byte values are the 1, 2, 3, …, 100, -1, -2, -3, … -100 series repeated indefinitely. + * + * @author Martin Desruisseaux (Geomatys) + * @version 1.4 + * @since 1.4 + * @module + */ +final strictfp class ComputedInputStream extends InputStream { + /** + * Number of bytes in this stream. + */ + private final int length; + + /** + * The current stream position. + */ + private int position; + + /** + * Random value to be returned by {@link #available()}. + */ + private int available; + + /** + * Whether this input stream has been closed. + */ + private boolean closed; + + /** + * Generator of random numbers for controlling the behavior of this stream. + */ + private final Random random; + + /** + * Creates a new input stream of the given length. + * + * @param start position of the first byte to read. + * @param end position after the last byte to read. + * @param rg generator of random numbers for controlling the behavior of this stream. + */ + ComputedInputStream(final int start, final int end, final Random rg) { + assertTrue(start >= 0); + assertTrue(start <= end); + position = start; + length = end; + random = rg; + } + + /** + * Returns the value at the given position. + * + * @param position the stream position where to get a value. + * @return value at the specified stream position. + */ + static byte valueAt(final int position) { + int i = (position % 200) + 1; + if (i > 100) i = 100 - i; + return (byte) i; + } + + /** + * Reads the next byte of data from the input stream. + */ + @Override + public int read() { + assertFalse("closed", closed); + if (available != 0) available--; + return (position < length) ? Byte.toUnsignedInt(valueAt(position++)) : -1; + } + + /** + * Reads up to {@code length} bytes of data from the input stream into an array of bytes. + * This method randomly read a smaller number of bytes. + * + * @param bytes the buffer into which the data is read. + * @param offseet the start offset at which the data is written. + * @param count the maximum number of bytes to read. + * @return the total number of bytes read into the buffer, or {@code -1} on EOF. + */ + @Override + public int read(final byte[] bytes, int offset, int count) { + assertFalse("closed", closed); + assertNotNull(bytes); + assertTrue("Negative count", count >= 0); + assertTrue("Nagative offset", offset >= 0); + assertTrue("Out of bounds", offset + count <= bytes.length); + if (position >= length) { + return -1; + } + if (count != 0) { + final int end = Math.min(offset + random.nextInt(count) + 1, length); + count = end - offset; + while (offset < end) { + bytes[offset++] = valueAt(position++); + } + } + if (available >= count) { + available -= count; + } else { + available = random.nextInt(100); + } + return count; + } + + /** + * Returns an estimate of the number of bytes that can be read without blocking. + * + * @return an estimate of the number of bytes available. + */ + @Override + public int available() { + assertFalse("closed", closed); + return available; + } + + /** + * Marks this input stream as closed. + */ + @Override + public void close() { + closed = true; + } +} diff --git a/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java b/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java new file mode 100644 index 0000000000..e33951c6b2 --- /dev/null +++ b/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sis.internal.storage.io; + +import java.util.Random; +import java.util.OptionalLong; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.sis.test.TestCase; +import org.apache.sis.test.TestUtilities; +import org.junit.Test; + +import static org.junit.Assert.*; + + +/** + * Tests {@link FileCacheByteChannel}. + * + * @author Martin Desruisseaux (Geomatys) + * @version 1.4 + * @since 1.4 + * @module + */ +public final strictfp class FileCacheByteChannelTest extends TestCase { + /** + * The implementation used for testing purpose. + */ + private static final class Implementation extends FileCacheByteChannel { + /** + * Name of the test method. Used for error messages only. + */ + private final String name; + + /** + * Number of bytes in the input stream to use for testing purpose. + * It should be large enough for forcing {@link #position(long, long)} + * to skip bytes instead of reading them. + */ + private final int length; + + /** + * Generator of random numbers for controlling the behavior of this channel. + */ + private final Random random; + + /** + * Creates a new test channel. + * + * @param test a name to use for identifying the test in error messages. + * @param rg generator of random numbers for controlling the behavior of this channel. + * @throws IOException if the temporary file can not be created. + */ + Implementation(final String test, final Random rg) throws IOException { + super("Test-"); + name = test; + length = SKIP_THRESHOLD * 10 + rg.nextInt(SKIP_THRESHOLD * 10); + random = rg; + } + + /** + * Returns a name to use in error messages. + */ + @Override + protected String filename() { + return name; + } + + /** + * Creates an input stream which provides the bytes to read starting at the specified position. + * + * @param start position of the first byte to read (inclusive). + * @param end position of the last byte to read with the returned stream (inclusive), + * or {@link Long#MAX_VALUE} for end of stream. + * @return contains the input stream providing the bytes to read. + */ + @Override + protected Connection openConnection(long start, long end) { + assertTrue(end >= 0); + if (end >= length) end = length - 1; + start = Math.max(start - random.nextInt(40), 0); + end = Math.min(end + random.nextInt(40), length - 1); // Inclusive. + var input = new ComputedInputStream(Math.toIntExact(start), Math.toIntExact(end), random); + return new Connection(input, start, end, length, true); + } + + /** + * Marks the given input stream as closed and notify that a new one can be created. + */ + @Override + protected boolean abort(final InputStream input) throws IOException { + input.close(); + return true; + } + + /** + * Reads the next bytes from the channel and stores them in a random region of the given buffer. + * On return, the buffer position is set on the first byte read and the buffer limit is set after + * the last byte read. + * + * @param dst the buffer where to store the bytes that are read. + * @return {@code true} if bytes have been read, or {@code false} on EOF. + * @throws IOException if an error occurred when reading or writing to the temporary file. + */ + final boolean readInRandomRegion(final ByteBuffer dst) throws IOException { + int start = random.nextInt(dst.capacity()); + int end = random.nextInt(dst.capacity()); + if (start > end) { + int t = start; + start = end; + end = t; + } + final int n = read(dst.limit(end).position(start)); + assertEquals("Number of bytes", Math.min(end - start, n), n); + assertEquals("Buffer position", start + Math.max(n, 0), dst.flip().position(start).limit()); + return n >= 0; + } + } + + /** + * Tests random operations on a stream of computed values. + * The bytes values are determined by their position, which allows easy verifications. + * + * @throws IOException if an error occurred when reading or writing to the temporary file. + */ + @Test + public void testRandomOperations() throws IOException { + final Random random = TestUtilities.createRandomNumberGenerator(); + final Implementation channel = new Implementation("test", random); + final ByteBuffer buffer = ByteBuffer.allocate(random.nextInt(1000) + 1000); + int position = 0; + for (int i=0; i<10000; i++) { + assertTrue(channel.isOpen()); + assertEquals(position, channel.position()); + if (random.nextInt(4) == 0) { + position = random.nextInt(channel.length - 1); + int end = random.nextInt(channel.length - 1); + if (position > end) { + int t = position; + position = end; + end = t; + } + channel.position(position, end - position + 1); + } + channel.readInRandomRegion(buffer); + while (buffer.hasRemaining()) { + assertEquals(ComputedInputStream.valueAt(position++), buffer.get()); + } + } + assertEquals(position, channel.position()); + channel.close(); // Intentionally no "try with resource". + assertFalse(channel.isOpen()); + } + + /** + * Tests the constructor that parse HTTP ranges. + * + * @see FileCacheByteChannel.Connection#Connection(InputStream, String, Iterable, OptionalLong) + */ + @Test + public void testParseRange() { + FileCacheByteChannel.Connection c; + c = new FileCacheByteChannel.Connection(null, "bytes 25000-75000/100000", List.of("bytes"), OptionalLong.empty()); + assertEquals( 25000, c.start); + assertEquals( 75000, c.end); + assertEquals(100000, c.length); + + c = new FileCacheByteChannel.Connection(null, "bytes 25000-75000", List.of("bytes"), OptionalLong.empty()); + assertEquals( 25000, c.start); + assertEquals( 75000, c.end); + assertEquals( -1, c.length); + + c = new FileCacheByteChannel.Connection(null, "bytes 25000/100000", List.of("bytes"), OptionalLong.empty()); + assertEquals( 25000, c.start); + assertEquals(100000, c.end); + assertEquals(100000, c.length); + + // Not legal, but we test robustness. + c = new FileCacheByteChannel.Connection(null, "25000", List.of("bytes"), OptionalLong.empty()); + assertEquals( 25000, c.start); + assertEquals( -1, c.end); + assertEquals( -1, c.length); + } +} diff --git a/storage/sis-storage/src/test/java/org/apache/sis/test/suite/StorageTestSuite.java b/storage/sis-storage/src/test/java/org/apache/sis/test/suite/StorageTestSuite.java index a669c14342..70dc7d492c 100644 --- a/storage/sis-storage/src/test/java/org/apache/sis/test/suite/StorageTestSuite.java +++ b/storage/sis-storage/src/test/java/org/apache/sis/test/suite/StorageTestSuite.java @@ -26,7 +26,7 @@ import org.junit.BeforeClass; * * @author Martin Desruisseaux (Geomatys) * @author Alexis Manin (Geomatys) - * @version 1.3 + * @version 1.4 * @since 0.3 * @module */ @@ -40,6 +40,7 @@ import org.junit.BeforeClass; org.apache.sis.internal.storage.io.ChannelImageOutputStreamTest.class, org.apache.sis.internal.storage.io.HyperRectangleReaderTest.class, org.apache.sis.internal.storage.io.RewindableLineReaderTest.class, + org.apache.sis.internal.storage.io.FileCacheByteChannelTest.class, org.apache.sis.internal.storage.MetadataBuilderTest.class, org.apache.sis.internal.storage.RangeArgumentTest.class, org.apache.sis.internal.storage.MemoryGridResourceTest.class,