xintongsong commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r780739757



##########
File path: tools/ci/maven-utils.sh
##########
@@ -104,7 +104,7 @@ MVN_GLOBAL_OPTIONS_WITHOUT_MIRROR+="--no-snapshot-updates "
 # enable non-interactive batch mode
 MVN_GLOBAL_OPTIONS_WITHOUT_MIRROR+="-B "
 # globally control the build profile details
-MVN_GLOBAL_OPTIONS_WITHOUT_MIRROR+="$PROFILE "
+MVN_GLOBAL_OPTIONS_WITHOUT_MIRROR+="$PROFILE"

Review comment:
       This change is unrelated

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/storage/MockBlobStorage.java
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.storage;
+
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.fs.gs.utils.ChecksumUtils;
+
+import com.google.cloud.storage.StorageException;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/** Mock blob storage implementation, using in-memory structures. */
+public class MockBlobStorage implements GSBlobStorage {
+
+    /** Mock blob value with metadata. */
+    public static class BlobValue {
+
+        public final byte[] content;
+
+        public BlobValue(byte[] content) {
+            this.content = content;
+        }
+    }
+
+    /** The mock blob metadata. */
+    static class BlobMetadata implements GSBlobStorage.BlobMetadata {
+
+        private final BlobValue blobValue;
+
+        private final String forcedChecksum;
+
+        BlobMetadata(BlobValue blobValue, String forcedChecksum) {
+            this.blobValue = blobValue;
+            this.forcedChecksum = forcedChecksum;
+        }
+
+        @Override
+        public String getChecksum() {
+            if (forcedChecksum != null) {
+                return forcedChecksum;
+            } else {
+                int checksum = 
ChecksumUtils.CRC_HASH_FUNCTION.hashBytes(blobValue.content).asInt();
+                return ChecksumUtils.convertChecksumToString(checksum);
+            }
+        }
+    }
+
+    /** The mock write channel, which writes to the memory-based storage. */
+    public class WriteChannel implements GSBlobStorage.WriteChannel {
+
+        private final GSBlobIdentifier blobIdentifier;
+
+        @Nullable public final MemorySize chunkSize;
+
+        private final ByteArrayOutputStream stream;
+
+        private boolean closed;
+
+        WriteChannel(GSBlobIdentifier blobIdentifier, @Nullable MemorySize 
chunkSize) {
+            this.blobIdentifier = blobIdentifier;
+            this.stream = new ByteArrayOutputStream();
+            this.closed = false;
+            this.chunkSize = chunkSize;
+        }
+
+        @Override
+        public int write(byte[] content, int start, int length) throws 
IOException {
+            if (closed) {
+                throw new ClosedChannelException();
+            }
+
+            int writeCount = Math.min(length, maxWriteCount);
+            stream.write(content, start, writeCount);
+            return writeCount;
+        }
+
+        @Override
+        public void close() throws IOException {
+            if (!closed) {
+                stream.close();
+                blobs.put(blobIdentifier, new BlobValue(stream.toByteArray()));
+                closed = true;
+            }
+        }
+    }
+
+    /** The blobs in the mock storage. */
+    public final Map<GSBlobIdentifier, BlobValue> blobs;
+
+    /** Set this to force a checksum value to be returned. */
+    public String forcedChecksum;

Review comment:
       Here `@Nullable` as well.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/storage/MockBlobStorage.java
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.storage;
+
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.fs.gs.utils.ChecksumUtils;
+
+import com.google.cloud.storage.StorageException;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/** Mock blob storage implementation, using in-memory structures. */
+public class MockBlobStorage implements GSBlobStorage {
+
+    /** Mock blob value with metadata. */
+    public static class BlobValue {
+
+        public final byte[] content;
+
+        public BlobValue(byte[] content) {
+            this.content = content;
+        }
+    }
+
+    /** The mock blob metadata. */
+    static class BlobMetadata implements GSBlobStorage.BlobMetadata {
+
+        private final BlobValue blobValue;
+
+        private final String forcedChecksum;
+
+        BlobMetadata(BlobValue blobValue, String forcedChecksum) {
+            this.blobValue = blobValue;
+            this.forcedChecksum = forcedChecksum;
+        }
+
+        @Override
+        public String getChecksum() {
+            if (forcedChecksum != null) {
+                return forcedChecksum;
+            } else {
+                int checksum = 
ChecksumUtils.CRC_HASH_FUNCTION.hashBytes(blobValue.content).asInt();
+                return ChecksumUtils.convertChecksumToString(checksum);
+            }
+        }
+    }
+
+    /** The mock write channel, which writes to the memory-based storage. */
+    public class WriteChannel implements GSBlobStorage.WriteChannel {
+
+        private final GSBlobIdentifier blobIdentifier;
+
+        @Nullable public final MemorySize chunkSize;
+
+        private final ByteArrayOutputStream stream;
+
+        private boolean closed;
+
+        WriteChannel(GSBlobIdentifier blobIdentifier, @Nullable MemorySize 
chunkSize) {
+            this.blobIdentifier = blobIdentifier;
+            this.stream = new ByteArrayOutputStream();
+            this.closed = false;
+            this.chunkSize = chunkSize;
+        }
+
+        @Override
+        public int write(byte[] content, int start, int length) throws 
IOException {
+            if (closed) {
+                throw new ClosedChannelException();
+            }
+
+            int writeCount = Math.min(length, maxWriteCount);
+            stream.write(content, start, writeCount);
+            return writeCount;
+        }
+
+        @Override
+        public void close() throws IOException {
+            if (!closed) {
+                stream.close();
+                blobs.put(blobIdentifier, new BlobValue(stream.toByteArray()));
+                closed = true;
+            }
+        }
+    }
+
+    /** The blobs in the mock storage. */
+    public final Map<GSBlobIdentifier, BlobValue> blobs;
+
+    /** Set this to force a checksum value to be returned. */
+    public String forcedChecksum;
+
+    /** Set this to cause writes to be truncated at this length. */
+    public int maxWriteCount = Integer.MAX_VALUE;
+
+    public MockBlobStorage() {
+        this.blobs = new HashMap<>();
+    }
+
+    @Override
+    public GSBlobStorage.WriteChannel writeBlob(GSBlobIdentifier blobId) {
+        return new WriteChannel(blobId, null);
+    }
+
+    @Override
+    public GSBlobStorage.WriteChannel writeBlob(GSBlobIdentifier blobId, 
MemorySize chunkSize) {
+        return new WriteChannel(blobId, chunkSize);
+    }
+
+    @Override
+    public void createBlob(GSBlobIdentifier blobIdentifier) {
+        blobs.put(blobIdentifier, new BlobValue(new byte[0]));
+    }
+
+    @Override
+    public Optional<GSBlobStorage.BlobMetadata> getMetadata(GSBlobIdentifier 
blobIdentifier) {
+        BlobValue blobValue = blobs.get(blobIdentifier);
+        if (blobValue != null) {
+            return Optional.of(new BlobMetadata(blobValue, forcedChecksum));
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    @Override
+    public List<GSBlobIdentifier> list(String bucketName, String prefix) {
+        return blobs.keySet().stream()
+                .filter(
+                        blobId ->
+                                blobId.bucketName.equals(bucketName)
+                                        && 
blobId.objectName.startsWith(prefix))
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public void copy(GSBlobIdentifier sourceBlobIdentifier, GSBlobIdentifier 
targetBlobIdentifier) {
+        BlobValue blobValue = blobs.get(sourceBlobIdentifier);
+        if (blobValue == null) {
+            throw new StorageException(404, "Copy source not found");
+        }
+        blobs.put(targetBlobIdentifier, blobValue);
+    }
+
+    @Override
+    public void compose(
+            List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier 
targetBlobIdentifier) {
+
+        if (sourceBlobIdentifiers.size() > BlobUtils.COMPOSE_MAX_BLOBS) {
+            throw new UnsupportedOperationException();
+        }
+
+        try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
+
+            // write all the source blobs into the stream
+            for (GSBlobIdentifier blobIdentifier : sourceBlobIdentifiers) {
+                BlobValue sourceBlobValue = blobs.get(blobIdentifier);
+                if (sourceBlobValue == null) {
+                    throw new StorageException(404, "Compose source not 
found");
+                }
+                stream.write(sourceBlobValue.content);
+            }
+
+            // write the resulting blob
+            BlobValue targetBlobValue = new BlobValue(stream.toByteArray());
+            blobs.put(targetBlobIdentifier, targetBlobValue);
+
+        } catch (IOException ex) {
+            throw new RuntimeException(ex);
+        }
+    }

Review comment:
       Just to double-check: IIUC, this is aligned with the GCS behavior that 
calling `compose` will not lead to deleting of the source blobs?

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterTest.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.GSBlobIdentifier;
+import org.apache.flink.fs.gs.storage.MockBlobStorage;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Test {@link GSRecoverableWriter}. */
+@RunWith(Parameterized.class)
+public class GSRecoverableWriterTest {
+
+    @Parameterized.Parameter(value = 0)
+    public long position = 16;
+
+    @Parameterized.Parameter(value = 1)
+    public boolean closed = false;
+
+    @Parameterized.Parameter(value = 2)
+    public int componentCount;
+
+    @Parameterized.Parameters(name = "position={0}, closed={1}, 
componentCount={2}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(
+                new Object[][] {
+                    // position 0, not closed, component count = 0
+                    {0, false, 0},
+                    // position 16, not closed, component count = 2
+                    {16, false, 2},
+                    // position 0, closed, component count = 0
+                    {0, true, 0},
+                    // position 16, closed, component count = 2
+                    {16, true, 2},
+                });
+    }
+
+    private GSFileSystemOptions options;
+
+    private GSRecoverableWriter writer;
+
+    private List<UUID> componentObjectIds;
+
+    private GSResumeRecoverable resumeRecoverable;
+
+    private GSCommitRecoverable commitRecoverable;
+
+    private GSBlobIdentifier blobIdentifier;
+
+    @Before
+    public void before() {
+        MockBlobStorage storage = new MockBlobStorage();
+        blobIdentifier = new GSBlobIdentifier("foo", "bar");
+
+        Configuration flinkConfig = new Configuration();
+        options = new GSFileSystemOptions(flinkConfig);
+        writer = new GSRecoverableWriter(storage, options);
+
+        componentObjectIds = new ArrayList<UUID>();
+        for (int i = 0; i < componentCount; i++) {
+            componentObjectIds.add(UUID.randomUUID());
+        }
+
+        resumeRecoverable =
+                new GSResumeRecoverable(blobIdentifier, componentObjectIds, 
position, closed);
+        commitRecoverable = new GSCommitRecoverable(blobIdentifier, 
componentObjectIds);
+    }
+
+    @Test
+    public void testRequiresCleanupOfRecoverableState() {
+        assertFalse(writer.requiresCleanupOfRecoverableState());
+    }
+
+    @Test
+    public void testSupportsResume() {
+        assertTrue(writer.supportsResume());
+    }
+
+    @Test
+    public void testOpen() throws IOException {
+        Path path = new Path("gs://foo/bar");
+        GSRecoverableFsDataOutputStream stream =
+                (GSRecoverableFsDataOutputStream) writer.open(path);
+        assertEquals("foo", stream.finalBlobIdentifier.bucketName);
+        assertEquals("bar", stream.finalBlobIdentifier.objectName);
+        assertEquals(options, stream.options);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testOpenWithEmptyBucketName() throws IOException {
+        Path path = new Path("gs:///bar");
+        writer.open(path);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testOpenWithEmptyObjectName() throws IOException {
+        Path path = new Path("gs://foo/");
+        writer.open(path);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testOpenWithMissingObjectName() throws IOException {
+        Path path = new Path("gs://foo");
+        writer.open(path);
+    }
+
+    @Test
+    public void testCleanupRecoverableState() {
+        assertTrue(writer.cleanupRecoverableState(resumeRecoverable));
+    }
+
+    @Test
+    public void testRecover() {
+        GSRecoverableFsDataOutputStream stream =
+                (GSRecoverableFsDataOutputStream) 
writer.recover(resumeRecoverable);
+        assertEquals(position, stream.position);
+        assertEquals(closed, stream.closed);
+        assertEquals(options, stream.options);
+        assertEquals(blobIdentifier, stream.finalBlobIdentifier);
+        assertArrayEquals(componentObjectIds.toArray(), 
stream.componentObjectIds.toArray());

Review comment:
       Same here about verifying the `GSRecoverableFsDataOutputStream` internal 
states. 

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/storage/MockBlobStorage.java
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.storage;
+
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.fs.gs.utils.ChecksumUtils;
+
+import com.google.cloud.storage.StorageException;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/** Mock blob storage implementation, using in-memory structures. */
+public class MockBlobStorage implements GSBlobStorage {
+
+    /** Mock blob value with metadata. */
+    public static class BlobValue {
+
+        public final byte[] content;
+
+        public BlobValue(byte[] content) {
+            this.content = content;
+        }
+    }
+
+    /** The mock blob metadata. */
+    static class BlobMetadata implements GSBlobStorage.BlobMetadata {
+
+        private final BlobValue blobValue;
+
+        private final String forcedChecksum;
+
+        BlobMetadata(BlobValue blobValue, String forcedChecksum) {
+            this.blobValue = blobValue;
+            this.forcedChecksum = forcedChecksum;
+        }
+
+        @Override
+        public String getChecksum() {
+            if (forcedChecksum != null) {
+                return forcedChecksum;
+            } else {
+                int checksum = 
ChecksumUtils.CRC_HASH_FUNCTION.hashBytes(blobValue.content).asInt();
+                return ChecksumUtils.convertChecksumToString(checksum);
+            }
+        }
+    }
+
+    /** The mock write channel, which writes to the memory-based storage. */
+    public class WriteChannel implements GSBlobStorage.WriteChannel {
+
+        private final GSBlobIdentifier blobIdentifier;
+
+        @Nullable public final MemorySize chunkSize;

Review comment:
       Why is this `public`?

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/BlobUtils.java
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.utils;
+
+import org.apache.flink.fs.gs.GSFileSystemFactory;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.GSBlobIdentifier;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.UUID;
+
+/** Utility functions related to blobs. */
+public class BlobUtils {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BlobUtils.class);

Review comment:
       This is never used.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/storage/MockBlobStorage.java
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.storage;
+
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.fs.gs.utils.ChecksumUtils;
+
+import com.google.cloud.storage.StorageException;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/** Mock blob storage implementation, using in-memory structures. */
+public class MockBlobStorage implements GSBlobStorage {
+
+    /** Mock blob value with metadata. */
+    public static class BlobValue {
+
+        public final byte[] content;
+
+        public BlobValue(byte[] content) {
+            this.content = content;
+        }
+    }
+
+    /** The mock blob metadata. */
+    static class BlobMetadata implements GSBlobStorage.BlobMetadata {
+
+        private final BlobValue blobValue;
+
+        private final String forcedChecksum;
+
+        BlobMetadata(BlobValue blobValue, String forcedChecksum) {
+            this.blobValue = blobValue;
+            this.forcedChecksum = forcedChecksum;
+        }
+
+        @Override
+        public String getChecksum() {
+            if (forcedChecksum != null) {

Review comment:
       If `forceChecksum` is expected to be `null` sometimes, we'd better 
annotate it with `@Nullable` explicitly.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/BlobUtilsTest.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.GSBlobIdentifier;
+
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test {@link BlobUtils}. */
+public class BlobUtilsTest {
+
+    @Test
+    public void shouldParseValidUri() {
+        GSBlobIdentifier blobIdentifier = 
BlobUtils.parseUri(URI.create("gs://bucket/foo/bar"));
+        assertEquals("bucket", blobIdentifier.bucketName);
+        assertEquals("foo/bar", blobIdentifier.objectName);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldFailToParseUriBadScheme() {
+        BlobUtils.parseUri(URI.create("s3://bucket/foo/bar"));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldFailToParseUriMissingBucketName() {
+        BlobUtils.parseUri(URI.create("gs:///foo/bar"));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldFailToParseUriMissingObjectName() {
+        BlobUtils.parseUri(URI.create("gs://bucket/"));
+    }
+
+    @Test
+    public void shouldUseTemporaryBucketNameIfSpecified() {
+        Configuration flinkConfig = new Configuration();
+        flinkConfig.setString("gs.writer.temporary.bucket.name", "temp");

Review comment:
       ```suggestion
           flinkConfig.set(GSFileSystemOptions.WRITER_TEMPORARY_BUCKET_NAME, 
"temp");
   ```

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/writer/GSCommitRecoverableTest.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.GSBlobIdentifier;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test {@link GSResumeRecoverable}. */
+@RunWith(Parameterized.class)
+public class GSCommitRecoverableTest {
+
+    @Parameterized.Parameter(value = 0)
+    public List<UUID> componentObjectIds;
+
+    @Parameterized.Parameter(value = 1)
+    public String temporaryBucketName;
+
+    @Parameterized.Parameters(name = "componentObjectIds={0}, 
temporaryBucketName={1}")
+    public static Collection<Object[]> data() {
+
+        ArrayList<UUID> emptyComponentObjectIds = new ArrayList<>();
+        ArrayList<UUID> populatedComponentObjectIds = new ArrayList<>();
+        for (int i = 0; i < 2; i++) {
+            populatedComponentObjectIds.add(UUID.randomUUID());
+        }
+        GSBlobIdentifier blobIdentifier = new GSBlobIdentifier("foo", "bar");
+
+        return Arrays.asList(
+                new Object[][] {
+                    // no component ids with no temporary bucket specified
+                    {emptyComponentObjectIds, null},
+                    // no component ids with a temporary bucket specified
+                    {emptyComponentObjectIds, "temporary-bucket"},
+                    // populated component ids with no temporary bucket 
specified
+                    {populatedComponentObjectIds, null},
+                    //  populated component ids with temporary bucket specified
+                    {populatedComponentObjectIds, "temporary-bucket"},
+                });
+    }
+
+    private GSBlobIdentifier blobIdentifier;
+
+    @Before
+    public void before() {
+        blobIdentifier = new GSBlobIdentifier("foo", "bar");
+    }
+
+    @Test
+    public void shouldConstructProperly() {
+        GSCommitRecoverable commitRecoverable =
+                new GSCommitRecoverable(blobIdentifier, componentObjectIds);
+        assertEquals(blobIdentifier, commitRecoverable.finalBlobIdentifier);
+        assertEquals(componentObjectIds, commitRecoverable.componentObjectIds);
+    }
+
+    /** Ensure that the list of component object ids cannot be added to. */
+    @Test(expected = UnsupportedOperationException.class)
+    public void shouldNotAddComponentId() {
+        GSCommitRecoverable commitRecoverable =
+                new GSCommitRecoverable(blobIdentifier, componentObjectIds);
+        commitRecoverable.componentObjectIds.add(UUID.randomUUID());
+    }
+
+    /** Ensure that component object ids can't be updated. */
+    @Test(expected = UnsupportedOperationException.class)
+    public void shouldNotModifyComponentId() {
+        GSCommitRecoverable commitRecoverable =
+                new GSCommitRecoverable(blobIdentifier, componentObjectIds);
+        commitRecoverable.componentObjectIds.set(0, UUID.randomUUID());
+    }
+
+    @Test
+    public void shouldGetComponentBlobIds() {
+
+        // configure options, if this test configuration has a temporary 
bucket name, set it
+        Configuration flinkConfig = new Configuration();
+        if (temporaryBucketName != null) {
+            flinkConfig.setString("gs.writer.temporary.bucket.name", 
temporaryBucketName);

Review comment:
       Same here for the configuration key.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterTest.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.GSBlobIdentifier;
+import org.apache.flink.fs.gs.storage.MockBlobStorage;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Test {@link GSRecoverableWriter}. */
+@RunWith(Parameterized.class)
+public class GSRecoverableWriterTest {
+
+    @Parameterized.Parameter(value = 0)
+    public long position = 16;
+
+    @Parameterized.Parameter(value = 1)
+    public boolean closed = false;
+
+    @Parameterized.Parameter(value = 2)
+    public int componentCount;
+
+    @Parameterized.Parameters(name = "position={0}, closed={1}, 
componentCount={2}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(
+                new Object[][] {
+                    // position 0, not closed, component count = 0
+                    {0, false, 0},
+                    // position 16, not closed, component count = 2
+                    {16, false, 2},
+                    // position 0, closed, component count = 0
+                    {0, true, 0},
+                    // position 16, closed, component count = 2
+                    {16, true, 2},
+                });
+    }
+
+    private GSFileSystemOptions options;
+
+    private GSRecoverableWriter writer;
+
+    private List<UUID> componentObjectIds;
+
+    private GSResumeRecoverable resumeRecoverable;
+
+    private GSCommitRecoverable commitRecoverable;
+
+    private GSBlobIdentifier blobIdentifier;
+
+    @Before
+    public void before() {
+        MockBlobStorage storage = new MockBlobStorage();
+        blobIdentifier = new GSBlobIdentifier("foo", "bar");
+
+        Configuration flinkConfig = new Configuration();
+        options = new GSFileSystemOptions(flinkConfig);
+        writer = new GSRecoverableWriter(storage, options);
+
+        componentObjectIds = new ArrayList<UUID>();
+        for (int i = 0; i < componentCount; i++) {
+            componentObjectIds.add(UUID.randomUUID());
+        }
+
+        resumeRecoverable =
+                new GSResumeRecoverable(blobIdentifier, componentObjectIds, 
position, closed);
+        commitRecoverable = new GSCommitRecoverable(blobIdentifier, 
componentObjectIds);
+    }
+
+    @Test
+    public void testRequiresCleanupOfRecoverableState() {
+        assertFalse(writer.requiresCleanupOfRecoverableState());
+    }
+
+    @Test
+    public void testSupportsResume() {
+        assertTrue(writer.supportsResume());
+    }
+
+    @Test
+    public void testOpen() throws IOException {
+        Path path = new Path("gs://foo/bar");
+        GSRecoverableFsDataOutputStream stream =
+                (GSRecoverableFsDataOutputStream) writer.open(path);
+        assertEquals("foo", stream.finalBlobIdentifier.bucketName);
+        assertEquals("bar", stream.finalBlobIdentifier.objectName);
+        assertEquals(options, stream.options);

Review comment:
       I'd suggest not to expose these `GSRecoverableFsDataOutputStream` 
internal states. It would be better to test against the contract rather than 
the internal states.
   
   I guess the best prove that the writer is properly opened is that we can 
successfully write data to the storage, which is probably already covered by 
other cases. Thus, I'd be fine with omit this one.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/BlobUtilsTest.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.GSBlobIdentifier;
+
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test {@link BlobUtils}. */
+public class BlobUtilsTest {
+
+    @Test
+    public void shouldParseValidUri() {
+        GSBlobIdentifier blobIdentifier = 
BlobUtils.parseUri(URI.create("gs://bucket/foo/bar"));
+        assertEquals("bucket", blobIdentifier.bucketName);
+        assertEquals("foo/bar", blobIdentifier.objectName);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldFailToParseUriBadScheme() {
+        BlobUtils.parseUri(URI.create("s3://bucket/foo/bar"));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldFailToParseUriMissingBucketName() {
+        BlobUtils.parseUri(URI.create("gs:///foo/bar"));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldFailToParseUriMissingObjectName() {
+        BlobUtils.parseUri(URI.create("gs://bucket/"));
+    }
+
+    @Test
+    public void shouldUseTemporaryBucketNameIfSpecified() {
+        Configuration flinkConfig = new Configuration();
+        flinkConfig.setString("gs.writer.temporary.bucket.name", "temp");
+        GSFileSystemOptions options = new GSFileSystemOptions(flinkConfig);
+        GSBlobIdentifier identifier = new GSBlobIdentifier("foo", "bar");
+
+        String bucketName = BlobUtils.getTemporaryBucketName(identifier, 
options);
+        assertEquals("temp", bucketName);
+    }
+
+    @Test
+    public void 
shouldUseIdentifierBucketNameNameIfTemporaryBucketNotSpecified() {
+        Configuration flinkConfig = new Configuration();
+        GSFileSystemOptions options = new GSFileSystemOptions(flinkConfig);
+        GSBlobIdentifier identifier = new GSBlobIdentifier("foo", "bar");
+
+        String bucketName = BlobUtils.getTemporaryBucketName(identifier, 
options);
+        assertEquals("foo", bucketName);
+    }
+
+    @Test
+    public void shouldProperlyConstructTemporaryObjectPartialName() {
+        GSBlobIdentifier identifier = new GSBlobIdentifier("foo", "bar");
+
+        String partialName = 
BlobUtils.getTemporaryObjectPartialName(identifier);
+        assertEquals(".inprogress/foo/bar/", partialName);
+    }
+
+    @Test
+    public void shouldProperlyConstructTemporaryObjectName() {
+        GSBlobIdentifier identifier = new GSBlobIdentifier("foo", "bar");
+        UUID temporaryObjectId = 
UUID.fromString("f09c43e5-ea49-4537-a406-0586f8f09d47");
+
+        String partialName = BlobUtils.getTemporaryObjectName(identifier, 
temporaryObjectId);
+        
assertEquals(".inprogress/foo/bar/f09c43e5-ea49-4537-a406-0586f8f09d47", 
partialName);
+    }
+
+    @Test
+    public void 
shouldProperlyConstructTemporaryBlobIdentifierWithDefaultBucket() {
+        Configuration flinkConfig = new Configuration();
+        GSFileSystemOptions options = new GSFileSystemOptions(flinkConfig);
+        GSBlobIdentifier identifier = new GSBlobIdentifier("foo", "bar");
+        UUID temporaryObjectId = 
UUID.fromString("f09c43e5-ea49-4537-a406-0586f8f09d47");
+
+        GSBlobIdentifier temporaryBlobIdentifier =
+                BlobUtils.getTemporaryBlobIdentifier(identifier, 
temporaryObjectId, options);
+        assertEquals("foo", temporaryBlobIdentifier.bucketName);
+        assertEquals(
+                ".inprogress/foo/bar/f09c43e5-ea49-4537-a406-0586f8f09d47",
+                temporaryBlobIdentifier.objectName);
+    }
+
+    @Test
+    public void 
shouldProperlyConstructTemporaryBlobIdentifierWithTemporaryBucket() {
+        Configuration flinkConfig = new Configuration();
+        flinkConfig.setString("gs.writer.temporary.bucket.name", "temp");

Review comment:
       Same here for the config key.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to