This is an automated email from the ASF dual-hosted git repository. desruisseaux pushed a commit to branch geoapi-4.0 in repository https://gitbox.apache.org/repos/asf/sis.git
The following commit(s) were added to refs/heads/geoapi-4.0 by this push: new 4ff2a5e381 Bug fixes (EOFException and bad content caused by bad position). Also fix a bug that prevented `HttpByteChannel` to effectively use "HTTP Range". 4ff2a5e381 is described below commit 4ff2a5e381af7166520fdbe4c36bf6696b68b830 Author: Martin Desruisseaux <martin.desruisse...@geomatys.com> AuthorDate: Sat Dec 24 15:32:15 2022 +0100 Bug fixes (EOFException and bad content caused by bad position). Also fix a bug that prevented `HttpByteChannel` to effectively use "HTTP Range". --- .../apache/sis/cloud/aws/s3/CachedByteChannel.java | 26 ++-- .../internal/storage/io/FileCacheByteChannel.java | 142 +++++++++++++-------- .../sis/internal/storage/io/HttpByteChannel.java | 21 ++- .../storage/io/FileCacheByteChannelTest.java | 61 +++++++-- 4 files changed, 169 insertions(+), 81 deletions(-) diff --git a/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java b/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java index 4bb3ae9cf1..b364cb92f1 100644 --- a/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java +++ b/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java @@ -72,8 +72,6 @@ final class CachedByteChannel extends FileCacheByteChannel { @Override protected Connection openConnection(final long start, final long end) throws IOException { final ResponseInputStream<GetObjectResponse> stream; - final String contentRange, acceptRanges; - final Long contentLength; try { GetObjectRequest.Builder builder = GetObjectRequest.builder().bucket(path.bucket).key(path.key); final String range = Connection.formatRange(start, end); @@ -82,19 +80,23 @@ final class CachedByteChannel extends FileCacheByteChannel { } stream = path.fs.client().getObject(builder.build()); final GetObjectResponse response = stream.response(); - contentLength = response.contentLength(); - contentRange = response.contentRange(); - acceptRanges = response.acceptRanges(); + final String contentRange = response.contentRange(); + final String acceptRanges = response.acceptRanges(); + final List<String> rangeUnits = (acceptRanges != null) ? List.of(acceptRanges) : List.of(); + try { + if (contentRange == null) { + final Long contentLength = response.contentLength(); + final long length = (contentLength != null) ? contentLength : -1; + return new Connection(stream, length, rangeUnits); + } else { + return new Connection(stream, contentRange, rangeUnits); + } + } catch (IllegalArgumentException e) { + throw new IOException(e); + } } catch (SdkException e) { throw FileService.failure(path, e); } - final List<String> rangeUnits = (acceptRanges != null) ? List.of(acceptRanges) : List.of(); - final long length = (contentLength != null) ? contentLength : -1; - try { - return new Connection(stream, contentRange, length, rangeUnits); - } catch (IllegalArgumentException e) { - throw new IOException(e); - } } /** diff --git a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java index 092fd52bc0..51c0ddccca 100644 --- a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java +++ b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java @@ -95,7 +95,7 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { /** Position of the last byte read by the input stream (inclusive). */ final long end; - /** Total length of the stream, or -1 is unknown. */ + /** Number of bytes in the full stream, or -1 is unknown. */ final long length; /** Whether connection can be created for ranges of bytes. */ @@ -104,56 +104,71 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { /** * Creates information about a connection. * - * @param input the input stream for reading the bytes. - * @param start position of the first byte read by the input stream (inclusive). - * @param end position of the last byte read by the input stream (inclusive). - * @param contentLength total length of the stream, or -1 if unknown. - * @param acceptRanges whether connection can be created for ranges of bytes. + * @param input the input stream for reading the bytes. + * @param start position of the first byte read by the input stream (inclusive). + * @param end position of the last byte read by the input stream (inclusive). + * @param length length of the full stream (not the content length), or -1 if unknown. + * @param acceptRanges whether connection can be created for ranges of bytes. * * @see #openConnection(long, long) */ - public Connection(final InputStream input, final long start, final long end, final long contentLength, final boolean acceptRanges) { + public Connection(final InputStream input, final long start, final long end, final long length, final boolean acceptRanges) { this.input = input; this.start = start; this.end = end; - this.length = contentLength; + this.length = length; this.acceptRanges = acceptRanges; } /** - * Creates information about a connection by parsing HTTP header. - * Example: "Content-Range: bytes 25000-75000/100000". + * Creates information about a connection by parsing HTTP header without content range. + * The "Content-Length" header value is useful to this class only if the connection was + * opened for the full file. + * + * @param input the input stream for reading the bytes. + * @param contentLength length of the response content, or -1 if unknown. + * @param acceptRanges value of "Accept-Ranges" in HTTP header. + * @throws IllegalArgumentException if the start, end or length cannot be parsed. + */ + public Connection(final InputStream input, final long contentLength, final Iterable<String> acceptRanges) { + this.input = input; + this.start = 0; + this.end = (contentLength > 0) ? contentLength - 1 : Long.MAX_VALUE; + this.length = contentLength; + this.acceptRanges = acceptRanges(acceptRanges); + } + + /** + * Creates information about a connection by parsing HTTP header with content range. + * Note that the "Content-Length" header value is not useful when a range is specified + * because the content length is not the full length of the file. + * + * <p>Example of content range value: {@code "Content-Range: bytes 25000-75000/100000"}.</p> * * @param input the input stream for reading the bytes. * @param contentRange value of "Content-Range" in HTTP header, or {@code null} if none. * @param acceptRanges value of "Accept-Ranges" in HTTP header. - * @param contentLength total length of the stream, or -1 if unknown. * @throws IllegalArgumentException if the start, end or length cannot be parsed. */ - public Connection(final InputStream input, String contentRange, long contentLength, final Iterable<String> acceptRanges) { + public Connection(final InputStream input, String contentRange, final Iterable<String> acceptRanges) { this.input = input; - if (contentRange == null) { - start = 0; - end = (contentLength > 0) ? contentLength - 1 : Long.MAX_VALUE; - length = contentLength; - } else { - contentRange = contentRange.trim(); - int s = contentRange.indexOf(' '); - if (s >= 0 && (s != RANGES_UNIT.length() || !contentRange.regionMatches(true, 0, RANGES_UNIT, 0, s))) { - throw new IllegalArgumentException(Errors.format(Errors.Keys.UnsupportedArgumentValue_1, contentRange)); - } - int rs = contentRange.indexOf('-', ++s); // Index of range separator. - int ls = contentRange.indexOf('/', Math.max(s, rs+1)); // Index of length separator. - if (contentLength < 0 && ls >= 0) { - final String t = contentRange.substring(ls+1).trim(); - if (!t.equals("*")) contentLength = Long.parseLong(t); - } - length = contentLength; - if (ls < 0) ls = contentRange.length(); - if (rs < 0) rs = ls; - start = Long.parseLong(contentRange.substring(s, rs).trim()); - end = (rs < ls) ? Long.parseLong(contentRange.substring(rs+1, ls).trim()) : length; + long contentLength = -1; + contentRange = contentRange.trim(); + int s = contentRange.indexOf(' '); + if (s >= 0 && (s != RANGES_UNIT.length() || !contentRange.regionMatches(true, 0, RANGES_UNIT, 0, s))) { + throw new IllegalArgumentException(Errors.format(Errors.Keys.UnsupportedArgumentValue_1, contentRange)); } + int rs = contentRange.indexOf('-', ++s); // Index of range separator. + int ls = contentRange.indexOf('/', Math.max(s, rs+1)); // Index of length separator. + if (ls >= 0) { + final String t = contentRange.substring(ls+1).trim(); + if (!t.equals("*")) contentLength = Long.parseLong(t); + } + length = contentLength; + if (ls < 0) ls = contentRange.length(); + if (rs < 0) rs = ls; + start = Long.parseLong(contentRange.substring(s, rs).trim()); + end = (rs < ls) ? Long.parseLong(contentRange.substring(rs+1, ls).trim()) : length; this.acceptRanges = acceptRanges(acceptRanges); } @@ -260,9 +275,11 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { private final RangeSet<Long> rangesOfAvailableBytes; /** - * Number of bytes in the full stream, or 0 if not yet computed. + * Number of bytes in the full stream, or -1 if not yet computed. + * It will be set to {@link Connection#length} when a connection is established, + * and updated for every new connection in case the value change. */ - private long length; + private long length = -1; /** * Creates a new channel which will cache bytes in a temporary file. @@ -333,6 +350,14 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { */ @Override public synchronized long size() throws IOException { + if (length < 0) { + if (connection == null) { + openConnection(); + } + if (length < 0) { + throw new IOException(Errors.format(Errors.Keys.Uninitialized_1, "size")); + } + } return length; } @@ -356,7 +381,11 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { */ @Override public synchronized SeekableByteChannel position(final long newPosition) throws IOException { - ArgumentChecks.ensurePositive("newPosition", newPosition); + if (length > 0) { + ArgumentChecks.ensureBetween("newPosition", 0, length-1, newPosition); + } else { + ArgumentChecks.ensurePositive("newPosition", newPosition); + } position = newPosition; if (endOfInterest - newPosition < SKIP_THRESHOLD) { endOfInterest = 0; // Read until end of stream. @@ -414,6 +443,9 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { * or when the number of bytes to skip is too small for being worth to create a new connection. * This method may skip less bytes than requested. The skipped bytes are saved in the cache. * + * <p>The {@link #position} field (the channel position) is not modified by this method. + * This method is invoked when input position needs to become equal to the channel position.</p> + * * @param count number of bytes to skip. * @return remaining number of bytes to skip after this method execution. * @throws IOException if an I/O error occurred. @@ -434,9 +466,10 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { if (n != 0 || (n = input.read()) < 0) { // Block until we get one byte. break; // End of stream, but maybe it was a sub-range. } - buffer.put((byte) n); + buffer.put(0, (byte) n); // Do not increment buffer position. n = 1; } + assert buffer.position() == 0; cache(buffer.limit(n)); count -= n; } while (count > 0); @@ -470,10 +503,10 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { */ Connection c = connection; long offset = position - file.position(); - if (offset != 0 && c != null) { + if (c != null) { if ((offset < 0 || (c.acceptRanges && (offset >= SKIP_THRESHOLD || position > c.end)))) { offset -= drainAndAbort(); - c = connection; + c = connection; // May become null as a result of `drainAndAbort()`. } } /* @@ -488,13 +521,14 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { } offset = skipInInput(offset); if (offset != 0) { - count = readFromCache(dst); + count = readFromCache(dst); // In case `skipInInput(…)` has read more bytes than desired. usedConnection(); if (count >= 0) { return count; } throw new EOFException(Errors.format(Errors.Keys.ValueOutOfRange_4, "position", 0, length, position)); } + assert file.position() == position; /* * Get a buffer that we can use with `InputStream.read(byte[])`. * It must be a buffer backed by a Java array. @@ -507,29 +541,28 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { buffer.clear().limit(dst.remaining()); } /* - * Transfer bytes from the input stream to the buffer. - * The bytes are also copied to the temporary file. + * Transfer bytes from the input stream to the buffer. The bytes are also copied to the temporary file. + * We try to use `dst` instead of `buffer` in call to `cache(…)` because the former may be a direct buffer. */ - final int limit = buffer.limit(); - final int start = buffer.position(); - count = c.input.read(buffer.array(), Math.addExact(buffer.arrayOffset(), start), buffer.remaining()); + final ByteBuffer slice = dst.slice(); + count = c.input.read(buffer.array(), Math.addExact(buffer.arrayOffset(), buffer.position()), buffer.remaining()); if (count > 0) { - try { - cache(buffer.limit(start + count)); - } finally { - buffer.limit(limit); - } + position += count; if (buffer != dst) { - dst.put(buffer.flip()); // Transfer from temporary buffer to destination buffer. + dst.put(buffer.limit(count)); // Transfer from temporary buffer to destination buffer. + } else { + dst.position(dst.position() + count); } - position += count; + cache(slice.limit(count)); } usedConnection(); return count; } /** - * Attempts to read up to bytes from the cache. + * Attempts to read up to <i>r</i> bytes from the cache. + * This method does not use the connection (it may be null). + * The {@link #position} field is updated by the amount of bytes read. * * @param dst the buffer where to store the bytes that are read. * @return number of bytes read, or -1 if the cache does not contain the requested range of bytes. @@ -547,10 +580,11 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { final int count; try { count = file.read(dst.limit(end), position); - position += count; + if (count >= 0) position += count; } finally { dst.limit(limit); } + assert dst.position() == start + count; return count; } diff --git a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java index fad086563a..3329a1865e 100644 --- a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java +++ b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java @@ -121,10 +121,27 @@ final class HttpByteChannel extends FileCacheByteChannel { range = headers.firstValue("Content-Range").orElse(null); final List<String> rangeUnits = headers.allValues("Accept-Ranges"); try { - final long length = headers.firstValueAsLong("Content-Length").orElse(-1); - return new Connection(stream, range, length, rangeUnits); + if (range == null) { + final long length = headers.firstValueAsLong("Content-Length").orElse(-1); + return new Connection(stream, length, rangeUnits); + } else { + return new Connection(stream, range, rangeUnits); + } } catch (IllegalArgumentException e) { throw new IOException(e); } } + + /** + * Invoked when this channel is no longer interested in reading bytes from the specified stream. + * + * @param input the input stream to eventually close. + * @return whether the given input stream has been closed by this method. + * @throws IOException if an error occurred while closing the stream or preparing for next read operations. + */ + @Override + protected boolean abort(final InputStream input) throws IOException { + input.close(); + return true; + } } diff --git a/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java b/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java index 6c633f5768..ca4e4bf0b1 100644 --- a/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java +++ b/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java @@ -16,12 +16,13 @@ */ package org.apache.sis.internal.storage.io; +import java.util.List; import java.util.Random; import java.util.OptionalLong; +import java.util.function.IntFunction; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.util.List; import org.apache.sis.test.TestCase; import org.apache.sis.test.TestUtilities; import org.junit.Test; @@ -92,11 +93,14 @@ public final strictfp class FileCacheByteChannelTest extends TestCase { @Override protected Connection openConnection(long start, long end) { assertTrue(end >= 0); - if (end >= length) end = length - 1; - start = Math.max(start - random.nextInt(40), 0); - end = Math.min(end + random.nextInt(40), length - 1); // Inclusive. + if (end < length) end++; // Exclusive (temporarily). + else end = length; // Replace Long.MAX_VALUE. + do { + start = Math.max(start - random.nextInt(40), 0); + end = Math.min(end + random.nextInt(40), length); + } while (start >= end); var input = new ComputedInputStream(Math.toIntExact(start), Math.toIntExact(end), random); - return new Connection(input, start, end, length, true); + return new Connection(input, start, end-1, length, true); } /** @@ -140,14 +144,36 @@ public final strictfp class FileCacheByteChannelTest extends TestCase { */ @Test public void testRandomOperations() throws IOException { + testRandomOperations(ByteBuffer::allocate); + } + + /** + * Tests random operations on a stream of computed values using a direct buffer. + * The code paths are slightly different compared to {@link #testRandomOperations()}. + * + * @throws IOException if an error occurred when reading or writing to the temporary file. + */ + @Test + public void testWithDirectBuffer() throws IOException { + testRandomOperations(ByteBuffer::allocateDirect); + } + + /** + * Implementation of {@link #testRandomOperations()} and {@link #testWithDirectBuffer()}. + * + * @param allocator the function to invoke for allocating a byte buffer. + * @throws IOException if an error occurred when reading or writing to the temporary file. + */ + private void testRandomOperations(final IntFunction<ByteBuffer> allocator) throws IOException { final Random random = TestUtilities.createRandomNumberGenerator(); final Implementation channel = new Implementation("test", random); - final ByteBuffer buffer = ByteBuffer.allocate(random.nextInt(1000) + 1000); + final ByteBuffer buffer = allocator.apply(random.nextInt(1000) + 1000); int position = 0; - for (int i=0; i<10000; i++) { + for (int i=0; i<5000; i++) { assertTrue(channel.isOpen()); assertEquals(position, channel.position()); - if (random.nextInt(4) == 0) { + final boolean seek = random.nextInt(4) == 0; + if (seek) { position = random.nextInt(channel.length - 1); int end = random.nextInt(channel.length - 1); if (position > end) { @@ -160,7 +186,16 @@ public final strictfp class FileCacheByteChannelTest extends TestCase { } channel.readInRandomRegion(buffer); while (buffer.hasRemaining()) { - assertEquals(ComputedInputStream.valueAt(position++), buffer.get()); + final byte expected = ComputedInputStream.valueAt(position++); + final byte actual = buffer.get(); + if (expected != actual) { + final var b = new StringBuilder(100).append("During iteration ").append(i) + .append(": Wrong byte value at position ").append(position); + if (seek) { + b.append(" (after seek)"); + } + fail(b.append(". Expected ").append(expected).append(" but got ").append(actual).append('.').toString()); + } } } assertEquals(position, channel.position()); @@ -177,23 +212,23 @@ public final strictfp class FileCacheByteChannelTest extends TestCase { public void testParseRange() { final List<String> rangesUnit = List.of("bytes"); FileCacheByteChannel.Connection c; - c = new FileCacheByteChannel.Connection(null, "bytes 25000-75000/100000", -1, rangesUnit); + c = new FileCacheByteChannel.Connection(null, "bytes 25000-75000/100000", rangesUnit); assertEquals( 25000, c.start); assertEquals( 75000, c.end); assertEquals(100000, c.length); - c = new FileCacheByteChannel.Connection(null, "bytes 25000-75000", -1, rangesUnit); + c = new FileCacheByteChannel.Connection(null, "bytes 25000-75000", rangesUnit); assertEquals( 25000, c.start); assertEquals( 75000, c.end); assertEquals( -1, c.length); - c = new FileCacheByteChannel.Connection(null, "bytes 25000/100000", -1, rangesUnit); + c = new FileCacheByteChannel.Connection(null, "bytes 25000/100000", rangesUnit); assertEquals( 25000, c.start); assertEquals(100000, c.end); assertEquals(100000, c.length); // Not legal, but we test robustness. - c = new FileCacheByteChannel.Connection(null, "25000", -1, rangesUnit); + c = new FileCacheByteChannel.Connection(null, "25000", rangesUnit); assertEquals( 25000, c.start); assertEquals( -1, c.end); assertEquals( -1, c.length);