tsreaper commented on a change in pull request #13:
URL: https://github.com/apache/flink-table-store/pull/13#discussion_r790492629



##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
##########
@@ -134,26 +139,36 @@ public void sync() throws Exception {
     }
 
     private Increment drainIncrement() {
+        // drain files to create Increment
         Increment increment =
                 new Increment(
                         new ArrayList<>(newFiles),
-                        new ArrayList<>(compactBefore),
+                        new ArrayList<>(compactBefore.values()),
                         new ArrayList<>(compactAfter));
         newFiles.clear();
         compactBefore.clear();
         compactAfter.clear();
+
+        // return increment
         return increment;

Review comment:
       No need for this comment.

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
##########
@@ -112,8 +115,10 @@ private void flush() throws Exception {
             Iterator<KeyValue> iterator = memTable.iterator(keyComparator, 
accumulator);
             List<SstFileMeta> files =
                     
sstFile.write(CloseableIterator.adapterForIterator(iterator), 0);
-            newFiles.addAll(files);
-            files.forEach(levels::addLevel0File);
+            for (SstFileMeta file : files) {
+                newFiles.add(file);
+                levels.addLevel0File(file);
+            }

Review comment:
       No need for this change?

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
##########
@@ -134,26 +139,36 @@ public void sync() throws Exception {
     }
 
     private Increment drainIncrement() {
+        // drain files to create Increment

Review comment:
       No need for this comment.

##########
File path: 
flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
##########
@@ -61,7 +61,12 @@
 import static java.util.Collections.singletonList;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for {@link MergeTree}. */
+/**
+ * Test for {@link MergeTree}.
+ *
+ * <p>Manual test: please adjust TARGET_FILE_SIZE to 1, so that a large number 
of upgrade files will
+ * be generated.

Review comment:
       No need for manual test. For `testCloseUpgrade` set write buffer size to 
4kb, page size to 1kb and target file size to 1kb. With 1000 records and 
`@RepeatedTest(10)` to run 10 times it has quite some probability to fail 
without this fix.

##########
File path: 
flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
##########
@@ -173,7 +173,11 @@ public void run() {
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
-            writer.close();
+            try {
+                writer.close();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }

Review comment:
       Remove `try {.writer.sync() }` above this code fragment because it is 
included in `writer.close()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to