gaborgsomogyi commented on code in PR #24914:
URL: https://github.com/apache/flink/pull/24914#discussion_r1633243846


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java:
##########
@@ -154,23 +158,27 @@ private List<Path> getCompactingPath(CompactorRequest 
request) throws IOExceptio
         return compactingFiles;
     }
 
-    private static Path assembleCompactedFilePath(Path uncompactedPath) {
-        String uncompactedName = uncompactedPath.getName();
-        if (uncompactedName.startsWith(".")) {
-            uncompactedName = uncompactedName.substring(1);
-        }
-        return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);
-    }
-
-    private static CompactingFileWriter.Type getWriterType(FileCompactor 
fileCompactor) {
+    @VisibleForTesting
+    static Type getWriterType(FileCompactor fileCompactor) {
         if (fileCompactor instanceof OutputStreamBasedFileCompactor) {
-            return CompactingFileWriter.Type.OUTPUT_STREAM;
+            return fileCompactor instanceof ConcatFileCompactor
+                            && ((ConcatFileCompactor) 
fileCompactor).isCompressed()
+                    ? Type.COMPRESSED_STREAM
+                    : Type.OUTPUT_STREAM;
         } else if (fileCompactor instanceof RecordWiseFileCompactor) {
-            return CompactingFileWriter.Type.RECORD_WISE;
+            return Type.RECORD_WISE;
         } else {
             throw new UnsupportedOperationException(
                     "Unable to crate compacting file writer for compactor:"
                             + fileCompactor.getClass());
         }
     }
+
+    private static Path assembleCompactedFilePath(Path uncompactedPath) {
+        String uncompactedName = uncompactedPath.getName();
+        if (uncompactedName.startsWith(".")) {
+            uncompactedName = uncompactedName.substring(1);
+        }
+        return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);

Review Comment:
   No strong opinion just asking. Compaction/compression normally adds postfix, 
why prefix here?



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java:
##########
@@ -154,23 +158,27 @@ private List<Path> getCompactingPath(CompactorRequest 
request) throws IOExceptio
         return compactingFiles;
     }
 
-    private static Path assembleCompactedFilePath(Path uncompactedPath) {
-        String uncompactedName = uncompactedPath.getName();
-        if (uncompactedName.startsWith(".")) {
-            uncompactedName = uncompactedName.substring(1);
-        }
-        return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);
-    }
-
-    private static CompactingFileWriter.Type getWriterType(FileCompactor 
fileCompactor) {
+    @VisibleForTesting
+    static Type getWriterType(FileCompactor fileCompactor) {
         if (fileCompactor instanceof OutputStreamBasedFileCompactor) {
-            return CompactingFileWriter.Type.OUTPUT_STREAM;
+            return fileCompactor instanceof ConcatFileCompactor
+                            && ((ConcatFileCompactor) 
fileCompactor).isCompressed()
+                    ? Type.COMPRESSED_STREAM
+                    : Type.OUTPUT_STREAM;
         } else if (fileCompactor instanceof RecordWiseFileCompactor) {
-            return CompactingFileWriter.Type.RECORD_WISE;
+            return Type.RECORD_WISE;
         } else {
             throw new UnsupportedOperationException(
                     "Unable to crate compacting file writer for compactor:"
                             + fileCompactor.getClass());
         }
     }
+
+    private static Path assembleCompactedFilePath(Path uncompactedPath) {
+        String uncompactedName = uncompactedPath.getName();
+        if (uncompactedName.startsWith(".")) {

Review Comment:
   Are we calling this with `..`?



##########
flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java:
##########
@@ -50,7 +50,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, 
BucketID>
 
     @Nullable final Path targetPath;
 
-    private CompactingFileWriter.Type writeType = null;
+    private Type writeType = null;

Review Comment:
   When we remove the `CompactingFileWriter` prefix then `Type` itself is hard 
to read. I suggest either put back the prefix or rename it to something more 
meaningful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to