Vinay Devadiga created HADOOP-19220: ---------------------------------------
Summary: S3A : S3AInputStream positioned readFully Expectation Key: HADOOP-19220 URL: https://issues.apache.org/jira/browse/HADOOP-19220 Project: Hadoop Common Issue Type: Bug Components: fs/s3 Reporter: Vinay Devadiga So basically i was testing to write some unit test - for S3AInputStream readFully Method package org.apache.hadoop.fs.s3a; import java.io.EOFException; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.net.SocketException; import java.net.URI; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan; import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; import org.apache.hadoop.util.BlockingThreadPoolExecutorService; import org.apache.hadoop.util.functional.CallableRaisingIOE; import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Test; import software.amazon.awssdk.awscore.exception.AwsErrorDetails; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.http.AbortableInputStream; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import static java.lang.Math.min; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD; import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A; import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE; import static org.apache.hadoop.fs.s3a.Constants.S3_CLIENT_FACTORY_IMPL; import static org.apache.hadoop.util.functional.FutureIO.eval; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; public class TestReadFullyAndPositionalRead { private S3AFileSystem fs; private S3AInputStream input; private S3Client s3; private static final String EMPTY = ""; private static final String INPUT = "test_content"; @Before public void setUp() throws IOException { Configuration conf = createConfiguration(); fs = new S3AFileSystem(); URI uri = URI.create(FS_S3A + "://" + MockS3AFileSystem.BUCKET); // Unset S3CSE property from config to avoid pathIOE. conf.unset(Constants.S3_ENCRYPTION_ALGORITHM); fs.initialize(uri, conf); s3 = fs.getS3AInternals().getAmazonS3Client("mocking"); } public Configuration createConfiguration() { Configuration conf = new Configuration(); conf.setClass(S3_CLIENT_FACTORY_IMPL, MockS3ClientFactory.class, S3ClientFactory.class); // use minimum multipart size for faster triggering conf.setLong(Constants.MULTIPART_SIZE, MULTIPART_MIN_SIZE); conf.setInt(Constants.S3A_BUCKET_PROBE, 1); // this is so stream draining is always blocking, allowing assertions to be safely made without worrying about any race conditions conf.setInt(ASYNC_DRAIN_THRESHOLD, Integer.MAX_VALUE); // set the region to avoid the getBucketLocation on FS init. conf.set(AWS_REGION, "eu-west-1"); return conf; } @Test public void testReadFullyFromBeginning() throws IOException { input = getMockedS3AInputStream(INPUT); byte[] byteArray = new byte[INPUT.length()]; input.readFully(0, byteArray, 0, byteArray.length); assertThat(new String(byteArray, UTF_8)).isEqualTo(INPUT); } @Test public void testReadFullyWithOffsetAndLength() throws IOException { input = getMockedS3AInputStream(INPUT); byte[] byteArray = new byte[4]; input.readFully(5, byteArray, 0, 4); assertThat(new String(byteArray, UTF_8)).isEqualTo("cont"); } @Test public void testReadFullyWithOffsetBeyondStream() throws IOException { input = getMockedS3AInputStream(INPUT); byte[] byteArray = new byte[10]; assertThatExceptionOfType(EOFException.class) .isThrownBy(() -> input.readFully(20, byteArray, 0, 10)); } private S3AInputStream getMockedS3AInputStream(String input) { Path path = new Path("test-path"); String eTag = "test-etag"; String versionId = "test-version-id"; String owner = "test-owner"; S3AFileStatus s3AFileStatus = new S3AFileStatus(input.length(), 0, path, input.length(), owner, eTag, versionId); S3ObjectAttributes s3ObjectAttributes = new S3ObjectAttributes( fs.getBucket(), path, fs.pathToKey(path), fs.getS3EncryptionAlgorithm(), new EncryptionSecrets().getEncryptionKey(), eTag, versionId, input.length()); S3AReadOpContext s3AReadOpContext = fs.createReadContext(s3AFileStatus, NoopSpan.INSTANCE); return new S3AInputStream(s3AReadOpContext, s3ObjectAttributes, getMockedInputStreamCallback(input), s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics(), BlockingThreadPoolExecutorService.newInstance(2, 40, 60, TimeUnit.SECONDS, "s3a-bounded")); } private S3AInputStream.InputStreamCallbacks getMockedInputStreamCallback(String input) { GetObjectResponse objectResponse = GetObjectResponse.builder().eTag("test-etag").build(); ResponseInputStream<GetObjectResponse>[] responseInputStreams = new ResponseInputStream[] { getMockedInputStream(objectResponse, true, input), getMockedInputStream(objectResponse, true, input), getMockedInputStream(objectResponse, false, input) }; return new S3AInputStream.InputStreamCallbacks() { private Integer mockedS3ObjectIndex = 0; @Override public ResponseInputStream<GetObjectResponse> getObject(GetObjectRequest request) { mockedS3ObjectIndex++; if (mockedS3ObjectIndex == 3) { throw AwsServiceException.builder() .message("Failed to get S3Object") .awsErrorDetails(AwsErrorDetails.builder().errorCode("test-code").build()) .build(); } return responseInputStreams[min(mockedS3ObjectIndex, responseInputStreams.length) - 1]; } @Override public GetObjectRequest.Builder newGetRequestBuilder(String key) { return GetObjectRequest.builder().bucket(fs.getBucket()).key(key); } @Override public <T> CompletableFuture<T> submit(final CallableRaisingIOE<T> task) { return eval(task); } @Override public void close() { } }; } private ResponseInputStream<GetObjectResponse> getMockedInputStream( GetObjectResponse response, boolean success, String input) { FilterInputStream stream = new FilterInputStream(AbortableInputStream.create( IOUtils.toInputStream(input, StandardCharsets.UTF_8), () -> { })) { @Override public void close() throws IOException { super.close(); if (!success) { throw new SocketException("Socket closed"); } } }; return new ResponseInputStream<>(response, stream); } } Now this - [ERROR] TestReadFullyAndPositionalRead.testPositionalReadWithOffsetAndLength:136 expected:<"[con]t"> but was:<"[tes]t"> is the failure its not adhering to the position parameter and reading the inital bytes only What is the expectation of the readFully Function in S3AInputStream? -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: common-dev-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-dev-h...@hadoop.apache.org