rkhachatryan commented on code in PR #28268:
URL: https://github.com/apache/flink/pull/28268#discussion_r3376369027


##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java:
##########
@@ -84,15 +87,93 @@ public RecoverableFsDataOutputStream.Committer 
recoverForCommit(CommitRecoverabl
     @Override
     public RecoverableFsDataOutputStream recover(ResumeRecoverable 
recoverable) throws IOException {
         checkNotClosed();
-        NativeS3Recoverable s3recoverable = 
castToNativeS3Recoverable(recoverable);
-        return new NativeS3RecoverableFsDataOutputStream(
-                s3AccessHelper,
-                s3recoverable.getObjectName(),
-                s3recoverable.uploadId(),
-                localTmpDir,
-                userDefinedMinPartSize,
-                s3recoverable.parts(),
-                s3recoverable.numBytesInParts());
+        final NativeS3Recoverable s3recoverable = 
castToNativeS3Recoverable(recoverable);
+
+        File incompleteTail = null;
+        long incompleteTailLength = 0L;
+        if (s3recoverable.incompleteObjectName() != null) {
+            incompleteTail = downloadIncompleteTail(s3recoverable);
+            incompleteTailLength = s3recoverable.incompleteObjectLength();

Review Comment:
   Why do we need to pass around `incompleteTailLength` in addition to 
`incompleteTail`?
   
   Per my understanding, at this point it's a local file with fixed known 
length. So just the file/name should be enough, shouldn't it?



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java:
##########
@@ -84,15 +87,93 @@ public RecoverableFsDataOutputStream.Committer 
recoverForCommit(CommitRecoverabl
     @Override
     public RecoverableFsDataOutputStream recover(ResumeRecoverable 
recoverable) throws IOException {
         checkNotClosed();
-        NativeS3Recoverable s3recoverable = 
castToNativeS3Recoverable(recoverable);
-        return new NativeS3RecoverableFsDataOutputStream(
-                s3AccessHelper,
-                s3recoverable.getObjectName(),
-                s3recoverable.uploadId(),
-                localTmpDir,
-                userDefinedMinPartSize,
-                s3recoverable.parts(),
-                s3recoverable.numBytesInParts());
+        final NativeS3Recoverable s3recoverable = 
castToNativeS3Recoverable(recoverable);
+
+        File incompleteTail = null;
+        long incompleteTailLength = 0L;
+        if (s3recoverable.incompleteObjectName() != null) {

Review Comment:
   Should we validate inside the if branch that the length > 0?



##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriterRecoveryTest.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.writer;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link NativeS3RecoverableWriter#recover}. */
+class NativeS3RecoverableWriterRecoveryTest {
+
+    private static final String BUCKET = 
InMemoryNativeS3Operations.DEFAULT_BUCKET;
+    private static final String KEY = "out.txt";
+    private static final long MIN_PART_SIZE = 10L;
+
+    @TempDir java.nio.file.Path tmp;
+
+    @Test
+    void persistThenRecoverPreservesTailBytes() throws Exception {

Review Comment:
   And probably test that the 2nd recovery **_attempt_** can still read the 
side part?



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java:
##########


Review Comment:
   This constructor (with null/0L) seems to be unused now.



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java:
##########
@@ -84,15 +87,93 @@ public RecoverableFsDataOutputStream.Committer 
recoverForCommit(CommitRecoverabl
     @Override
     public RecoverableFsDataOutputStream recover(ResumeRecoverable 
recoverable) throws IOException {
         checkNotClosed();
-        NativeS3Recoverable s3recoverable = 
castToNativeS3Recoverable(recoverable);
-        return new NativeS3RecoverableFsDataOutputStream(
-                s3AccessHelper,
-                s3recoverable.getObjectName(),
-                s3recoverable.uploadId(),
-                localTmpDir,
-                userDefinedMinPartSize,
-                s3recoverable.parts(),
-                s3recoverable.numBytesInParts());
+        final NativeS3Recoverable s3recoverable = 
castToNativeS3Recoverable(recoverable);
+
+        File incompleteTail = null;
+        long incompleteTailLength = 0L;
+        if (s3recoverable.incompleteObjectName() != null) {
+            incompleteTail = downloadIncompleteTail(s3recoverable);
+            incompleteTailLength = s3recoverable.incompleteObjectLength();
+        }
+
+        try {
+            LOG.debug(
+                    "Resuming stream - key: {}, uploadId: {}, parts: {}, 
bytesInParts: {}, incompleteTail: {} ({} bytes)",
+                    s3recoverable.getObjectName(),
+                    s3recoverable.uploadId(),
+                    s3recoverable.parts().size(),
+                    s3recoverable.numBytesInParts(),
+                    s3recoverable.incompleteObjectName(),
+                    incompleteTailLength);
+            return new NativeS3RecoverableFsDataOutputStream(
+                    s3AccessHelper,
+                    s3recoverable.getObjectName(),
+                    s3recoverable.uploadId(),
+                    localTmpDir,
+                    userDefinedMinPartSize,
+                    s3recoverable.parts(),
+                    s3recoverable.numBytesInParts(),
+                    incompleteTail,
+                    incompleteTailLength);
+        } catch (Throwable t) {

Review Comment:
   Should we use Exception here instead of Throwable?
   I doubt that we want this block to be executed in case of VM errors for 
example.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to