wuchong commented on code in PR #20469: URL: https://github.com/apache/flink/pull/20469#discussion_r942380136
########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java: ########## @@ -95,6 +95,20 @@ public class HiveOptions { + " When the value is over estimated, Flink will tend to pack Hive's data into less splits, which will be helpful when Hive's table contains many small files." + " And vice versa. It only works for the Hive table stored as ORC format."); + public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_KIND = + key("sink.partition-commit.policy.kind") + .stringType() + .defaultValue("metastore,success-file") + .withDescription( + "Policy to commit a partition is to notify the downstream" + + " application that the partition has finished writing, the partition" + + " is ready to be read." + + " metastore: add partition to metastore. " + + " success-file: add '_success' file to directory." Review Comment: ``` success-file: add a success file to the partition directory. The success file name can be configured by the 'sink.partition-commit.success-file.name' option. ``` ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java: ########## @@ -95,6 +95,20 @@ public class HiveOptions { + " When the value is over estimated, Flink will tend to pack Hive's data into less splits, which will be helpful when Hive's table contains many small files." + " And vice versa. It only works for the Hive table stored as ORC format."); + public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_KIND = Review Comment: Add a comment to explain why we need to re-define a same configuration here, e.g., ```java /** * Hive users usually commit partition for metastore and a _SUCCESS file. That's why we * create a same option with {@link * FileSystemConnectorOptions#SINK_PARTITION_COMMIT_POLICY_KIND} with different default value. */ ``` ########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java: ########## @@ -173,7 +206,14 @@ void testEmptyPartition() throws Exception { assertThat(emptyPartitionFile).exists(); assertThat(emptyPartitionFile).isDirectory(); - assertThat(emptyPartitionFile).isEmptyDirectory(); + files = + FileUtils.listFilesInDirectory( + Paths.get(emptyPartitionFile.getPath()), + (path) -> + !path.toFile().isHidden() + && !path.toFile().getName().startsWith("_")) Review Comment: I think this can be simplified into ```java assertThat(emptyPartitionFile) .isDirectoryNotContaining(file -> !file.getName().equals("_SUCCESS")); ``` There shouldn't be hidden files in the directory, right? ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java: ########## @@ -337,6 +340,12 @@ private DataStreamSink<Row> createBatchSink( builder.setTempPath( new org.apache.flink.core.fs.Path(toStagingDir(stagingParentDir, jobConf))); builder.setOutputFileConfig(fileNaming); + builder.setIdentifier(identifier); + builder.setPolicyKind(conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND)); + builder.setCustomClass( + conf.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS)); + builder.setSuccessFileName( + conf.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME)); Review Comment: Please only options of `HiveOptions` in the Hive connector. Otherwise, it's error-prone to use `FileSystemConnectorOptions`, e.g., unexpected default value. You can reuse the options by referring the definition from `FileSystemConnectorOptions`, for example, in `HiveOptions.java`: ```java public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_CLASS = FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS; public static final ConfigOption<String> SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME = FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME; ``` ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemCommitter.java: ########## @@ -58,7 +59,9 @@ class FileSystemCommitter implements Serializable { private final boolean isToLocal; private final Path tmpPath; private final int partitionColumnSize; + private final ObjectIdentifier identifier; private final LinkedHashMap<String, String> staticPartitions; + private final List<PartitionCommitPolicy> policies; Review Comment: `PartitionCommitPolicy` is not serializable, but `FileSystemCommitter` implements `Serializable`. I checked that `FileSystemCommitter` is only used as a local variable. Therefore, `FileSystemCommitter` doesn't need to implement the `Serializable` interface. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org