gaborgsomogyi commented on code in PR #24914: URL: https://github.com/apache/flink/pull/24914#discussion_r1633243846
########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java: ########## @@ -154,23 +158,27 @@ private List<Path> getCompactingPath(CompactorRequest request) throws IOExceptio return compactingFiles; } - private static Path assembleCompactedFilePath(Path uncompactedPath) { - String uncompactedName = uncompactedPath.getName(); - if (uncompactedName.startsWith(".")) { - uncompactedName = uncompactedName.substring(1); - } - return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName); - } - - private static CompactingFileWriter.Type getWriterType(FileCompactor fileCompactor) { + @VisibleForTesting + static Type getWriterType(FileCompactor fileCompactor) { if (fileCompactor instanceof OutputStreamBasedFileCompactor) { - return CompactingFileWriter.Type.OUTPUT_STREAM; + return fileCompactor instanceof ConcatFileCompactor + && ((ConcatFileCompactor) fileCompactor).isCompressed() + ? Type.COMPRESSED_STREAM + : Type.OUTPUT_STREAM; } else if (fileCompactor instanceof RecordWiseFileCompactor) { - return CompactingFileWriter.Type.RECORD_WISE; + return Type.RECORD_WISE; } else { throw new UnsupportedOperationException( "Unable to crate compacting file writer for compactor:" + fileCompactor.getClass()); } } + + private static Path assembleCompactedFilePath(Path uncompactedPath) { + String uncompactedName = uncompactedPath.getName(); + if (uncompactedName.startsWith(".")) { + uncompactedName = uncompactedName.substring(1); + } + return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName); Review Comment: No strong opinion just asking. Compaction/compression normally adds postfix, why prefix here? ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java: ########## @@ -154,23 +158,27 @@ private List<Path> getCompactingPath(CompactorRequest request) throws IOExceptio return compactingFiles; } - private static Path assembleCompactedFilePath(Path uncompactedPath) { - String uncompactedName = uncompactedPath.getName(); - if (uncompactedName.startsWith(".")) { - uncompactedName = uncompactedName.substring(1); - } - return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName); - } - - private static CompactingFileWriter.Type getWriterType(FileCompactor fileCompactor) { + @VisibleForTesting + static Type getWriterType(FileCompactor fileCompactor) { if (fileCompactor instanceof OutputStreamBasedFileCompactor) { - return CompactingFileWriter.Type.OUTPUT_STREAM; + return fileCompactor instanceof ConcatFileCompactor + && ((ConcatFileCompactor) fileCompactor).isCompressed() + ? Type.COMPRESSED_STREAM + : Type.OUTPUT_STREAM; } else if (fileCompactor instanceof RecordWiseFileCompactor) { - return CompactingFileWriter.Type.RECORD_WISE; + return Type.RECORD_WISE; } else { throw new UnsupportedOperationException( "Unable to crate compacting file writer for compactor:" + fileCompactor.getClass()); } } + + private static Path assembleCompactedFilePath(Path uncompactedPath) { + String uncompactedName = uncompactedPath.getName(); + if (uncompactedName.startsWith(".")) { Review Comment: Are we calling this with `..`? ########## flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java: ########## @@ -50,7 +50,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> @Nullable final Path targetPath; - private CompactingFileWriter.Type writeType = null; + private Type writeType = null; Review Comment: When we remove the `CompactingFileWriter` prefix then `Type` itself is hard to read. I suggest either put back the prefix or rename it to something more meaningful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org