fredia commented on code in PR #21812:
URL: https://github.com/apache/flink/pull/21812#discussion_r1100920557


##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java:
##########
@@ -106,6 +109,143 @@ void testPersistAgain() throws Exception {
                 });
     }
 
+    @Test
+    void testChangelogFileAvailable() throws Exception {
+        long appendPersistThreshold = 100;
+
+        TaskChangelogRegistry taskChangelogRegistry =
+                new TaskChangelogRegistryImpl(Executors.directExecutor());
+
+        try (DiscardRecordableStateChangeUploader uploader =
+                        new 
DiscardRecordableStateChangeUploader(taskChangelogRegistry);
+                TestingBatchingUploadScheduler uploadScheduler =
+                        new TestingBatchingUploadScheduler(uploader);
+                FsStateChangelogWriter writer =
+                        new FsStateChangelogWriter(
+                                UUID.randomUUID(),
+                                KeyGroupRange.of(KEY_GROUP, KEY_GROUP),
+                                uploadScheduler,
+                                appendPersistThreshold,
+                                new SyncMailboxExecutor(),
+                                taskChangelogRegistry,
+                                TestLocalRecoveryConfig.disabled(),
+                                LocalChangelogRegistry.NO_OP)) {
+            SequenceNumber initialSqn = writer.initialSequenceNumber();
+
+            writer.append(KEY_GROUP, getBytes(10)); // sqn: 0
+
+            // checkpoint 1 trigger
+            SequenceNumber checkpoint1sqn = writer.nextSequenceNumber();
+            writer.persist(initialSqn);
+            uploadScheduler.scheduleAll(); // checkpoint 1 completed
+            writer.confirm(initialSqn, checkpoint1sqn, 1);
+
+            writer.append(KEY_GROUP, getBytes(10)); // sqn: 1
+
+            // materialization 1 trigger
+            SequenceNumber materializationSqn = writer.nextSequenceNumber();
+
+            writer.append(KEY_GROUP, getBytes(10)); // sqn: 2
+
+            // materialization 1 completed
+            // checkpoint 2 trigger
+            SequenceNumber checkpoint2sqn = writer.nextSequenceNumber();
+            writer.persist(materializationSqn);
+            uploadScheduler.scheduleAll(); // checkpoint 2 completed
+            writer.confirm(materializationSqn, checkpoint2sqn, 2);
+
+            // checkpoint 1 subsumed
+            writer.truncate(
+                    materializationSqn.compareTo(checkpoint1sqn) < 0
+                            ? materializationSqn
+                            : checkpoint1sqn);
+
+            writer.append(KEY_GROUP, getBytes(10)); // sqn: 3
+
+            // checkpoint 3 trigger
+            SequenceNumber checkpoint3sqn = writer.nextSequenceNumber();
+            writer.persist(materializationSqn);
+            uploadScheduler.scheduleAll(); // checkpoint 3 completed
+            writer.confirm(materializationSqn, checkpoint3sqn, 3);
+
+            // trigger pre-emptive upload
+            writer.append(KEY_GROUP, getBytes(100)); // sqn: 4
+            uploadScheduler.scheduleAll();
+
+            // checkpoint 2 subsumed
+            writer.truncate(
+                    materializationSqn.compareTo(checkpoint2sqn) < 0
+                            ? materializationSqn
+                            : checkpoint2sqn);
+
+            // checkpoint 4 trigger
+            SequenceNumber checkpoint4sqn = writer.nextSequenceNumber();
+            CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> 
future =
+                    writer.persist(materializationSqn);
+            uploadScheduler.scheduleAll(); // checkpoint 4 completed
+            writer.confirm(materializationSqn, checkpoint4sqn, 4);
+
+            SnapshotResult<ChangelogStateHandleStreamImpl> result = 
future.get();
+            ChangelogStateHandleStreamImpl resultHandle = 
result.getJobManagerOwnedSnapshot();
+
+            for (Tuple2<StreamStateHandle, Long> handleAndOffset :
+                    resultHandle.getHandlesAndOffsets()) {
+                assertThat(uploader.isDiscarded(handleAndOffset.f0))
+                        .isFalse(); // all handles should not be discarded
+            }
+        }
+    }
+
+    @Test
+    void testChangelogFileAvailableAgain() throws Exception {

Review Comment:
   How about renaming this to `testFileAvailableAfterClose`?



##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java:
##########
@@ -106,6 +109,143 @@ void testPersistAgain() throws Exception {
                 });
     }
 
+    @Test
+    void testChangelogFileAvailable() throws Exception {

Review Comment:
   How about moving this test to https://github.com/apache/flink/pull/21895, 
and renaming it to `testFileAvailableAfterPreUpload`?



##########
flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateDiscardTest.java:
##########
@@ -420,17 +424,45 @@ public void upload(UploadTask uploadTask) throws 
IOException {
          */
         public List<UploadResult> completeUploads(
                 Function<UploadTask, List<UploadResult>> resultsProvider) {
+
             List<UploadResult> allResults = new ArrayList<>();
+            List<Tuple2<UploadTask, List<UploadResult>>> taskResults = new 
ArrayList<>();
             uploads.forEach(
                     task -> {
                         List<UploadResult> results = 
resultsProvider.apply(task);
-                        for (UploadResult result : results) {
-                            startTracking(registry, 
result.getStreamStateHandle(), task);
-                        }
+                        taskResults.add(Tuple2.of(task, results));
                         allResults.addAll(results);
+                    });
+
+            Map<PhysicalStateHandleID, Long> stateIdAndRefCounts = new 
HashMap<>();
+            Map<PhysicalStateHandleID, StreamStateHandle> stateIdAndHandles = 
new HashMap<>();
+            for (UploadResult result : allResults) {
+                StreamStateHandle handle = result.getStreamStateHandle();
+                PhysicalStateHandleID handleID = 
handle.getStreamStateHandleID();
+                stateIdAndRefCounts.compute(
+                        handleID,
+                        (id, oldRefCount) -> {
+                            if (oldRefCount == null) {
+                                return 1L;
+                            }
+                            return oldRefCount + 1;
+                        });
+                stateIdAndHandles.put(handleID, handle);
+            }
+
+            stateIdAndRefCounts.forEach(
+                    ((handleID, refCount) -> {
+                        
registry.startTracking(stateIdAndHandles.get(handleID), refCount);
+                    }));

Review Comment:
   ```suggestion
     Map<StreamStateHandle, Long> stateHandleAndRefCounts = 
allResults.stream().collect(groupingBy(UploadResult::getStreamStateHandle, 
summingLong(x->1L)));
               stateHandleAndRefCounts.forEach((handle, refCount) -> 
registry.startTracking(handle, refCount));
   ```



##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TaskChangelogRegistryImplTest.java:
##########
@@ -50,10 +45,13 @@ public void testDiscardedWhenNotUsed() {
     public void testNotDiscardedIfStoppedTracking() {
         TaskChangelogRegistry registry = new 
TaskChangelogRegistryImpl(directExecutor());
         TestingStreamStateHandle handle = new TestingStreamStateHandle();
-        List<UUID> backends = Arrays.asList(UUID.randomUUID(), 
UUID.randomUUID());
-        registry.startTracking(handle, new HashSet<>(backends));
+        long refCount = 2;
+        registry.startTracking(handle, 2);

Review Comment:
   ```suggestion
           registry.startTracking(handle, refCount);
   ```



##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingBatchingUploadScheduler.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Queue;
+
+/** Implementation class for {@link StateChangeUploadScheduler} to test. */
+class TestingBatchingUploadScheduler implements StateChangeUploadScheduler {

Review Comment:
   Can we reuse `StateChangeUploadScheduler#directScheduler()` instead of 
`TestingBatchingUploadScheduler`?



##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TaskChangelogRegistryImplTest.java:
##########
@@ -37,11 +32,11 @@ public class TaskChangelogRegistryImplTest {
     public void testDiscardedWhenNotUsed() {
         TaskChangelogRegistry registry = new 
TaskChangelogRegistryImpl(directExecutor());
         TestingStreamStateHandle handle = new TestingStreamStateHandle();
-        List<UUID> backends = Arrays.asList(UUID.randomUUID(), 
UUID.randomUUID());
-        registry.startTracking(handle, new HashSet<>(backends));
-        for (UUID backend : backends) {
+        long refCount = 2;
+        registry.startTracking(handle, 2);

Review Comment:
   ```suggestion
           registry.startTracking(handle, refCount);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to