Vinay Devadiga created HADOOP-19220:
---------------------------------------

             Summary: S3A : S3AInputStream positioned readFully Expectation
                 Key: HADOOP-19220
                 URL: https://issues.apache.org/jira/browse/HADOOP-19220
             Project: Hadoop Common
          Issue Type: Bug
          Components: fs/s3
            Reporter: Vinay Devadiga


So basically i was testing to write some unit test - for S3AInputStream 
readFully Method 

package org.apache.hadoop.fs.s3a;

import java.io.EOFException;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan;
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;

import static java.lang.Math.min;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD;
import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.S3_CLIENT_FACTORY_IMPL;
import static org.apache.hadoop.util.functional.FutureIO.eval;
import static org.assertj.core.api.Assertions.assertThat;
import static 
org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

public class TestReadFullyAndPositionalRead {

    private S3AFileSystem fs;
    private S3AInputStream input;
    private S3Client s3;
    private static final String EMPTY = "";
    private static final String INPUT = "test_content";

    @Before
    public void setUp() throws IOException {
        Configuration conf = createConfiguration();
        fs = new S3AFileSystem();
        URI uri = URI.create(FS_S3A + "://" + MockS3AFileSystem.BUCKET);
        // Unset S3CSE property from config to avoid pathIOE.
        conf.unset(Constants.S3_ENCRYPTION_ALGORITHM);
        fs.initialize(uri, conf);
        s3 = fs.getS3AInternals().getAmazonS3Client("mocking");
    }

    public Configuration createConfiguration() {
        Configuration conf = new Configuration();
        conf.setClass(S3_CLIENT_FACTORY_IMPL, MockS3ClientFactory.class, 
S3ClientFactory.class);
        // use minimum multipart size for faster triggering
        conf.setLong(Constants.MULTIPART_SIZE, MULTIPART_MIN_SIZE);
        conf.setInt(Constants.S3A_BUCKET_PROBE, 1);
        // this is so stream draining is always blocking, allowing assertions 
to be safely made without worrying about any race conditions
        conf.setInt(ASYNC_DRAIN_THRESHOLD, Integer.MAX_VALUE);
        // set the region to avoid the getBucketLocation on FS init.
        conf.set(AWS_REGION, "eu-west-1");
        return conf;
    }

    @Test
    public void testReadFullyFromBeginning() throws IOException {
        input = getMockedS3AInputStream(INPUT);
        byte[] byteArray = new byte[INPUT.length()];
        input.readFully(0, byteArray, 0, byteArray.length);
        assertThat(new String(byteArray, UTF_8)).isEqualTo(INPUT);
    }

    @Test
    public void testReadFullyWithOffsetAndLength() throws IOException {
        input = getMockedS3AInputStream(INPUT);
        byte[] byteArray = new byte[4];
        input.readFully(5, byteArray, 0, 4);
        assertThat(new String(byteArray, UTF_8)).isEqualTo("cont");
    }

    @Test
    public void testReadFullyWithOffsetBeyondStream() throws IOException {
        input = getMockedS3AInputStream(INPUT);
        byte[] byteArray = new byte[10];
        assertThatExceptionOfType(EOFException.class)
                .isThrownBy(() -> input.readFully(20, byteArray, 0, 10));
    }

    private S3AInputStream getMockedS3AInputStream(String input) {
        Path path = new Path("test-path");
        String eTag = "test-etag";
        String versionId = "test-version-id";
        String owner = "test-owner";
        S3AFileStatus s3AFileStatus = new S3AFileStatus(input.length(), 0, 
path, input.length(), owner, eTag, versionId);
        S3ObjectAttributes s3ObjectAttributes = new S3ObjectAttributes(
                fs.getBucket(), path, fs.pathToKey(path), 
fs.getS3EncryptionAlgorithm(), new EncryptionSecrets().getEncryptionKey(), 
eTag, versionId, input.length());
        S3AReadOpContext s3AReadOpContext = fs.createReadContext(s3AFileStatus, 
NoopSpan.INSTANCE);
        return new S3AInputStream(s3AReadOpContext, s3ObjectAttributes, 
getMockedInputStreamCallback(input), 
s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics(), 
BlockingThreadPoolExecutorService.newInstance(2, 40, 60, TimeUnit.SECONDS, 
"s3a-bounded"));
    }

    private S3AInputStream.InputStreamCallbacks 
getMockedInputStreamCallback(String input) {
        GetObjectResponse objectResponse = 
GetObjectResponse.builder().eTag("test-etag").build();
        ResponseInputStream<GetObjectResponse>[] responseInputStreams = new 
ResponseInputStream[] {
                getMockedInputStream(objectResponse, true, input),
                getMockedInputStream(objectResponse, true, input),
                getMockedInputStream(objectResponse, false, input)
        };
        return new S3AInputStream.InputStreamCallbacks() {
            private Integer mockedS3ObjectIndex = 0;
            @Override
            public ResponseInputStream<GetObjectResponse> 
getObject(GetObjectRequest request) {
                mockedS3ObjectIndex++;
                if (mockedS3ObjectIndex == 3) {
                    throw AwsServiceException.builder()
                            .message("Failed to get S3Object")
                            
.awsErrorDetails(AwsErrorDetails.builder().errorCode("test-code").build())
                            .build();
                }
                return responseInputStreams[min(mockedS3ObjectIndex, 
responseInputStreams.length) - 1];
            }

            @Override
            public GetObjectRequest.Builder newGetRequestBuilder(String key) {
                return 
GetObjectRequest.builder().bucket(fs.getBucket()).key(key);
            }

            @Override
            public <T> CompletableFuture<T> submit(final CallableRaisingIOE<T> 
task) {
                return eval(task);
            }

            @Override
            public void close() {
            }
        };
    }

    private ResponseInputStream<GetObjectResponse> getMockedInputStream(
            GetObjectResponse response, boolean success, String input) {
        FilterInputStream stream = new 
FilterInputStream(AbortableInputStream.create(
                IOUtils.toInputStream(input, StandardCharsets.UTF_8), () -> {
                })) {
            @Override
            public void close() throws IOException {
                super.close();
                if (!success) {
                    throw new SocketException("Socket closed");
                }
            }
        };
        return new ResponseInputStream<>(response, stream);
    }

}

Now this -

[ERROR]   
TestReadFullyAndPositionalRead.testPositionalReadWithOffsetAndLength:136 
expected:<"[con]t"> but was:<"[tes]t">

is the failure its not adhering to the position parameter and reading the 
inital bytes only

What is the expectation of the readFully Function in S3AInputStream?





--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-dev-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-dev-h...@hadoop.apache.org

Reply via email to