fredia commented on code in PR #21822:
URL: https://github.com/apache/flink/pull/21822#discussion_r1144531026


##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java:
##########
@@ -246,6 +247,223 @@ void testFileAvailableAfterClose() throws Exception {
         }
     }
 
+    @Test
+    void testLocalFileDiscard() throws Exception {
+        long appendPersistThreshold = 100;
+        TaskChangelogRegistry taskChangelogRegistry =
+                new TaskChangelogRegistryImpl(Executors.directExecutor());
+
+        try (DiscardRecordableStateChangeUploader uploader =
+                        new 
DiscardRecordableStateChangeUploader(taskChangelogRegistry);
+                TestingBatchingUploadScheduler uploadScheduler =
+                        new TestingBatchingUploadScheduler(uploader);
+                FsStateChangelogWriter writer =
+                        new FsStateChangelogWriter(
+                                UUID.randomUUID(),
+                                KeyGroupRange.of(KEY_GROUP, KEY_GROUP),
+                                uploadScheduler,
+                                appendPersistThreshold,
+                                new SyncMailboxExecutor(),
+                                taskChangelogRegistry,
+                                TestLocalRecoveryConfig.enabledForTest(),
+                                new LocalChangelogRegistryImpl(
+                                        
Executors.newDirectExecutorService()))) {
+            SequenceNumber initialSqn = writer.initialSequenceNumber();
+
+            writer.append(KEY_GROUP, getBytes(10));
+
+            // checkpoint 1 trigger
+            SequenceNumber checkpoint1sqn = writer.nextSequenceNumber();
+            writer.persist(initialSqn, 1L);
+            uploadScheduler.scheduleAll(); // checkpoint 1 completed
+            writer.confirm(initialSqn, checkpoint1sqn, 1);
+
+            // trigger pre-emptive upload
+            writer.append(KEY_GROUP, getBytes(100));
+            uploadScheduler.scheduleAll();
+            writer.append(KEY_GROUP, getBytes(10));
+            // checkpoint 2 trigger
+            SequenceNumber checkpoint2sqn = writer.nextSequenceNumber();
+            CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> 
future2 =
+                    writer.persist(initialSqn, 2L);
+            uploadScheduler.scheduleAll(); // checkpoint 2 completed
+            writer.confirm(initialSqn, checkpoint2sqn, 2);
+            SnapshotResult<ChangelogStateHandleStreamImpl> result2 = 
future2.get();
+            for (Tuple2<StreamStateHandle, Long> handleAndOffset :
+                    result2.getTaskLocalSnapshot().getHandlesAndOffsets()) {
+                assertThat(uploader.isDiscarded(handleAndOffset.f0)).isFalse();
+            }
+
+            // materialization 1 trigger
+            SequenceNumber materializationSqn = writer.nextSequenceNumber();
+            writer.append(KEY_GROUP, getBytes(10));
+
+            // materialization 1 completed
+            // checkpoint 3 trigger
+            SequenceNumber checkpoint3sqn = writer.nextSequenceNumber();
+            writer.persist(materializationSqn, 3L);
+            uploadScheduler.scheduleAll(); // checkpoint 3 completed
+            writer.confirm(materializationSqn, checkpoint3sqn, 3);
+            for (Tuple2<StreamStateHandle, Long> handleAndOffset :
+                    result2.getTaskLocalSnapshot().getHandlesAndOffsets()) {
+                assertThat(uploader.isDiscarded(handleAndOffset.f0)).isTrue();
+            }
+        }
+    }
+
+    @Test
+    void testLocalFileAfterMaterialize() throws Exception {
+        // If register local files when confirm(), the following case will 
fail:
+        // cp1 trigger: file1,file1'(local)
+        // JM: register [file1] to sharedRegistry
+        // cp1 complete: stopTracking [file1], register [file1'] to 
localRegistry
+        // cp2 trigger: file1,file1',file2,file2'
+        // JM: register [file1,file2] to sharedRegistry
+        // cp2 complete: stopTracking [file1,file1',file2,file2'], register 
[file1',file2'] to
+        // localRegistry
+        // cp1 subsume
+        // cp3 trigger:  file1,file1',file2,file2',file3,file3'
+        // materialization: uploaded.clear()
+        // JM: register [file1,file2,file3] to sharedRegistry
+        // cp3 complete: stopTracking [], register [] to localRegistry
+        // cp2 subsume: [file1', file2'] are discarded
+        // if restore from cp3: local file1',file2' are not found
+        long appendPersistThreshold = 100;
+        TaskChangelogRegistry taskChangelogRegistry =
+                new TaskChangelogRegistryImpl(Executors.directExecutor());
+
+        try (DiscardRecordableStateChangeUploader uploader =
+                        new 
DiscardRecordableStateChangeUploader(taskChangelogRegistry);
+                TestingBatchingUploadScheduler uploadScheduler =
+                        new TestingBatchingUploadScheduler(uploader);
+                FsStateChangelogWriter writer =
+                        new FsStateChangelogWriter(
+                                UUID.randomUUID(),
+                                KeyGroupRange.of(KEY_GROUP, KEY_GROUP),
+                                uploadScheduler,
+                                appendPersistThreshold,
+                                new SyncMailboxExecutor(),
+                                taskChangelogRegistry,
+                                TestLocalRecoveryConfig.enabledForTest(),
+                                new LocalChangelogRegistryImpl(
+                                        
Executors.newDirectExecutorService()))) {
+            SequenceNumber initialSqn = writer.initialSequenceNumber();
+
+            writer.append(KEY_GROUP, getBytes(10));
+
+            // checkpoint 1 trigger
+            SequenceNumber checkpoint1sqn = writer.nextSequenceNumber();
+            writer.persist(initialSqn, 1L);
+            uploadScheduler.scheduleAll(); // checkpoint 1 completed
+            writer.confirm(initialSqn, checkpoint1sqn, 1);
+
+            writer.append(KEY_GROUP, getBytes(10));
+            // checkpoint 2 trigger
+            SequenceNumber checkpoint2sqn = writer.nextSequenceNumber();
+            writer.persist(initialSqn, 2L);
+            uploadScheduler.scheduleAll(); // checkpoint 2 completed
+            writer.confirm(initialSqn, checkpoint2sqn, 2);
+            writer.truncate(initialSqn);

Review Comment:
   Deleted it, it was used to highlight `notifySubsume()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to