wuchong commented on code in PR #20469:
URL: https://github.com/apache/flink/pull/20469#discussion_r942380136


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java:
##########
@@ -95,6 +95,20 @@ public class HiveOptions {
                                     + " When the value is over estimated, 
Flink will tend to pack Hive's data into less splits, which will be helpful 
when Hive's table contains many small files."
                                     + " And vice versa. It only works for the 
Hive table stored as ORC format.");
 
+    public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_KIND 
=
+            key("sink.partition-commit.policy.kind")
+                    .stringType()
+                    .defaultValue("metastore,success-file")
+                    .withDescription(
+                            "Policy to commit a partition is to notify the 
downstream"
+                                    + " application that the partition has 
finished writing, the partition"
+                                    + " is ready to be read."
+                                    + " metastore: add partition to metastore. 
"
+                                    + " success-file: add '_success' file to 
directory."

Review Comment:
   ```
   success-file: add a success file to the partition directory. The success 
file name can be configured by the 'sink.partition-commit.success-file.name' 
option.
   ```



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java:
##########
@@ -95,6 +95,20 @@ public class HiveOptions {
                                     + " When the value is over estimated, 
Flink will tend to pack Hive's data into less splits, which will be helpful 
when Hive's table contains many small files."
                                     + " And vice versa. It only works for the 
Hive table stored as ORC format.");
 
+    public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_KIND 
=

Review Comment:
   Add a comment to explain why we need to re-define a same configuration here, 
e.g.,
   
   ```java
       /**
        * Hive users usually commit partition for metastore and a _SUCCESS 
file. That's why we
        * create a same option with {@link
        * FileSystemConnectorOptions#SINK_PARTITION_COMMIT_POLICY_KIND} with 
different default value.
        */
   ```



##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java:
##########
@@ -173,7 +206,14 @@ void testEmptyPartition() throws Exception {
 
         assertThat(emptyPartitionFile).exists();
         assertThat(emptyPartitionFile).isDirectory();
-        assertThat(emptyPartitionFile).isEmptyDirectory();
+        files =
+                FileUtils.listFilesInDirectory(
+                                Paths.get(emptyPartitionFile.getPath()),
+                                (path) ->
+                                        !path.toFile().isHidden()
+                                                && 
!path.toFile().getName().startsWith("_"))

Review Comment:
   I think this can be simplified into 
   
   ```java
   assertThat(emptyPartitionFile)
                   .isDirectoryNotContaining(file -> 
!file.getName().equals("_SUCCESS"));
   ```
   
   There shouldn't be hidden files in the directory, right?
   



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java:
##########
@@ -337,6 +340,12 @@ private DataStreamSink<Row> createBatchSink(
         builder.setTempPath(
                 new 
org.apache.flink.core.fs.Path(toStagingDir(stagingParentDir, jobConf)));
         builder.setOutputFileConfig(fileNaming);
+        builder.setIdentifier(identifier);
+        
builder.setPolicyKind(conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND));
+        builder.setCustomClass(
+                
conf.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS));
+        builder.setSuccessFileName(
+                
conf.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME));

Review Comment:
   Please only options of `HiveOptions` in the Hive connector. Otherwise, it's 
error-prone to use `FileSystemConnectorOptions`, e.g., unexpected default 
value. You can reuse the options by referring the definition from 
`FileSystemConnectorOptions`, for example, in `HiveOptions.java`:
   
   ```java
   public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_CLASS =
               FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS;
   
       public static final ConfigOption<String> 
SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME =
               
FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;
   ```



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemCommitter.java:
##########
@@ -58,7 +59,9 @@ class FileSystemCommitter implements Serializable {
     private final boolean isToLocal;
     private final Path tmpPath;
     private final int partitionColumnSize;
+    private final ObjectIdentifier identifier;
     private final LinkedHashMap<String, String> staticPartitions;
+    private final List<PartitionCommitPolicy> policies;

Review Comment:
   `PartitionCommitPolicy` is not serializable, but `FileSystemCommitter` 
implements `Serializable`. I checked that `FileSystemCommitter` is only used as 
a local variable. Therefore, `FileSystemCommitter` doesn't need to implement 
the `Serializable` interface. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to