ferenc-csaky commented on code in PR #24914:
URL: https://github.com/apache/flink/pull/24914#discussion_r1634721788


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java:
##########
@@ -154,23 +158,27 @@ private List<Path> getCompactingPath(CompactorRequest 
request) throws IOExceptio
         return compactingFiles;
     }
 
-    private static Path assembleCompactedFilePath(Path uncompactedPath) {
-        String uncompactedName = uncompactedPath.getName();
-        if (uncompactedName.startsWith(".")) {
-            uncompactedName = uncompactedName.substring(1);
-        }
-        return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);
-    }
-
-    private static CompactingFileWriter.Type getWriterType(FileCompactor 
fileCompactor) {
+    @VisibleForTesting
+    static Type getWriterType(FileCompactor fileCompactor) {
         if (fileCompactor instanceof OutputStreamBasedFileCompactor) {
-            return CompactingFileWriter.Type.OUTPUT_STREAM;
+            return fileCompactor instanceof ConcatFileCompactor
+                            && ((ConcatFileCompactor) 
fileCompactor).isCompressed()
+                    ? Type.COMPRESSED_STREAM
+                    : Type.OUTPUT_STREAM;
         } else if (fileCompactor instanceof RecordWiseFileCompactor) {
-            return CompactingFileWriter.Type.RECORD_WISE;
+            return Type.RECORD_WISE;
         } else {
             throw new UnsupportedOperationException(
                     "Unable to crate compacting file writer for compactor:"
                             + fileCompactor.getClass());
         }
     }
+
+    private static Path assembleCompactedFilePath(Path uncompactedPath) {
+        String uncompactedName = uncompactedPath.getName();
+        if (uncompactedName.startsWith(".")) {

Review Comment:
   I did not touched this method, I just moved the `getWriterType` above this 
to keep the method visibility order in the file intact, because that method 
became package-private.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to