luoyuxia commented on code in PR #20469: URL: https://github.com/apache/flink/pull/20469#discussion_r939595765
########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java: ########## @@ -52,17 +52,27 @@ public class PartitionLoader implements Closeable { private final TableMetaStoreFactory.TableMetaStore metaStore; // whether it's to load files to local file system private final boolean isToLocal; + private final boolean writeSuccessFileEnabled; + private final String fileName; Review Comment: it is redundant. ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java: ########## @@ -52,17 +52,27 @@ public class PartitionLoader implements Closeable { private final TableMetaStoreFactory.TableMetaStore metaStore; // whether it's to load files to local file system private final boolean isToLocal; + private final boolean writeSuccessFileEnabled; Review Comment: May not need to keep this field in this class. When commitPolicy is not null, apply this policy. ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemCommitter.java: ########## @@ -83,7 +87,17 @@ public void commitPartitions() throws Exception { List<Path> taskPaths = listTaskTemporaryPaths(fs, tmpPath); try (PartitionLoader loader = - new PartitionLoader(overwrite, fs, metaStoreFactory, isToLocal)) { + new PartitionLoader( + overwrite, + fs, + metaStoreFactory, + isToLocal, + flinkConfig.get( Review Comment: Pass the `flinkconf` to `PartitionLoader` ? ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java: ########## @@ -154,8 +169,69 @@ private void moveFiles(List<Path> srcDirs, Path destDir) throws Exception { } } + private void createSuccessFile(LinkedHashMap<String, String> partitionSpec, Path path) + throws Exception { + if (writeSuccessFileEnabled) { + PartitionCommitPolicy.Context context = + new CommitPolicyContextImpl(partitionSpec, path); + commitPolicy.commit(context); + } + } + @Override public void close() throws IOException { metaStore.close(); } + + private class CommitPolicyContextImpl implements PartitionCommitPolicy.Context { Review Comment: ```suggestion private static class CommitPolicyContextImpl implements PartitionCommitPolicy.Context { ``` ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java: ########## @@ -154,8 +169,69 @@ private void moveFiles(List<Path> srcDirs, Path destDir) throws Exception { } } + private void createSuccessFile(LinkedHashMap<String, String> partitionSpec, Path path) Review Comment: rename to commitPartition? ########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java: ########## @@ -198,6 +205,83 @@ void testEmptyPartition() throws Exception { assertThat(new File(emptyPartitionFile, "f1")).exists(); } + @Test + public void testWriteSuccessFile() throws Exception { + Configuration flinkConfig = new Configuration(); + flinkConfig.setBoolean( + FileSystemConnectorOptions.HIVE_SINK_PARTITION_COMMIT_SUCCESS_FILE_ENABLED, true); + String successFileName = + flinkConfig.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME); + + // add new empty partition + LinkedHashMap<String, String> staticPartitions = new LinkedHashMap<String, String>(); + staticPartitions.put("dt", "2022-08-05"); + FileSystemCommitter committer = + new FileSystemCommitter( + fileSystemFactory, + metaStoreFactory, + true, + new Path(path.toString()), + 1, + false, + staticPartitions, + flinkConfig); + + createFile(path, "task-1/dt=2022-08-02/"); + createFile(path, "task-2/dt=2022-08-02/"); + + committer.commitPartitions(); + + File emptyPartitionFile = new File(outputPath.toFile(), "dt=2022-08-02"); + assertThat(emptyPartitionFile).exists(); + assertThat(emptyPartitionFile).isDirectory(); + assertThat(emptyPartitionFile).isNotEmptyDirectory(); + assertThat(new File(emptyPartitionFile, successFileName)).exists(); + + // test not partition table + committer = + new FileSystemCommitter( + fileSystemFactory, + metaStoreFactory, + true, + new Path(path.toString()), + 0, + false, + new LinkedHashMap<String, String>(), + flinkConfig); + + createFile(path, "task-1/", "f1", "f2"); + createFile(path, "task-2/", "f3"); + committer.commitPartitions(); + assertThat(new File(outputPath.toFile(), "f1")).exists(); + assertThat(new File(outputPath.toFile(), "f2")).exists(); + assertThat(new File(outputPath.toFile(), "f3")).exists(); + assertThat(new File(outputPath.toFile(), successFileName)).exists(); + + // test dynamic partition Review Comment: // test commit multiple partitions ? ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java: ########## @@ -245,6 +245,13 @@ public class FileSystemConnectorOptions { .withDescription( "The compaction target file size, the default value is the rolling file size."); + public static final ConfigOption<Boolean> HIVE_SINK_PARTITION_COMMIT_SUCCESS_FILE_ENABLED = Review Comment: It's a hive specifc configuration, should be moved to HiveOptions ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java: ########## @@ -245,6 +245,13 @@ public class FileSystemConnectorOptions { .withDescription( "The compaction target file size, the default value is the rolling file size."); + public static final ConfigOption<Boolean> HIVE_SINK_PARTITION_COMMIT_SUCCESS_FILE_ENABLED = + key("hive.sink.partition-commit.success-file.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to create '_success' file in the partition or table directory when using Hive Batch Sink."); Review Comment: ```suggestion "Whether to create success-file in the partition or table directory when using Hive batch sink."); ``` ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java: ########## @@ -154,8 +169,69 @@ private void moveFiles(List<Path> srcDirs, Path destDir) throws Exception { } } + private void createSuccessFile(LinkedHashMap<String, String> partitionSpec, Path path) + throws Exception { + if (writeSuccessFileEnabled) { + PartitionCommitPolicy.Context context = + new CommitPolicyContextImpl(partitionSpec, path); + commitPolicy.commit(context); + } + } + @Override public void close() throws IOException { metaStore.close(); } + + private class CommitPolicyContextImpl implements PartitionCommitPolicy.Context { Review Comment: Add java doc for this class, and explain it's only for write success file currently, so that only the method `partitionPath`, `partitionSpec` is needed. ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java: ########## @@ -245,6 +245,13 @@ public class FileSystemConnectorOptions { .withDescription( "The compaction target file size, the default value is the rolling file size."); + public static final ConfigOption<Boolean> HIVE_SINK_PARTITION_COMMIT_SUCCESS_FILE_ENABLED = Review Comment: And I think we also should add a new option like `Hive_SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME` in HiveOptions ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java: ########## @@ -102,22 +112,27 @@ public void loadNonPartition(List<Path> srcDirs) throws Exception { public void loadEmptyPartition(LinkedHashMap<String, String> partSpec) throws Exception { Optional<Path> pathFromMeta = metaStore.getPartition(partSpec); if (pathFromMeta.isPresent() && !overwrite) { + createSuccessFile(partSpec, pathFromMeta.get()); return; } Path path = new Path(metaStore.getLocationPath(), generatePartitionPath(partSpec)); if (pathFromMeta.isPresent()) { fs.delete(pathFromMeta.get(), true); fs.mkdirs(path); } + createSuccessFile(partSpec, path); Review Comment: Call this method in `loadPartition` and `loadNonPartition` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org