luoyuxia commented on code in PR #20469:
URL: https://github.com/apache/flink/pull/20469#discussion_r939595765


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java:
##########
@@ -52,17 +52,27 @@ public class PartitionLoader implements Closeable {
     private final TableMetaStoreFactory.TableMetaStore metaStore;
     // whether it's to load files to local file system
     private final boolean isToLocal;
+    private final boolean writeSuccessFileEnabled;
+    private final String fileName;

Review Comment:
   it is redundant.



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java:
##########
@@ -52,17 +52,27 @@ public class PartitionLoader implements Closeable {
     private final TableMetaStoreFactory.TableMetaStore metaStore;
     // whether it's to load files to local file system
     private final boolean isToLocal;
+    private final boolean writeSuccessFileEnabled;

Review Comment:
   May not need to keep this field in this class. When commitPolicy is not 
null, apply this policy.



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemCommitter.java:
##########
@@ -83,7 +87,17 @@ public void commitPartitions() throws Exception {
         List<Path> taskPaths = listTaskTemporaryPaths(fs, tmpPath);
 
         try (PartitionLoader loader =
-                new PartitionLoader(overwrite, fs, metaStoreFactory, 
isToLocal)) {
+                new PartitionLoader(
+                        overwrite,
+                        fs,
+                        metaStoreFactory,
+                        isToLocal,
+                        flinkConfig.get(

Review Comment:
   Pass the `flinkconf` to `PartitionLoader` ?



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java:
##########
@@ -154,8 +169,69 @@ private void moveFiles(List<Path> srcDirs, Path destDir) 
throws Exception {
         }
     }
 
+    private void createSuccessFile(LinkedHashMap<String, String> 
partitionSpec, Path path)
+            throws Exception {
+        if (writeSuccessFileEnabled) {
+            PartitionCommitPolicy.Context context =
+                    new CommitPolicyContextImpl(partitionSpec, path);
+            commitPolicy.commit(context);
+        }
+    }
+
     @Override
     public void close() throws IOException {
         metaStore.close();
     }
+
+    private class CommitPolicyContextImpl implements 
PartitionCommitPolicy.Context {

Review Comment:
   ```suggestion
       private static class CommitPolicyContextImpl implements 
PartitionCommitPolicy.Context {
   ```



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java:
##########
@@ -154,8 +169,69 @@ private void moveFiles(List<Path> srcDirs, Path destDir) 
throws Exception {
         }
     }
 
+    private void createSuccessFile(LinkedHashMap<String, String> 
partitionSpec, Path path)

Review Comment:
   rename to commitPartition?



##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java:
##########
@@ -198,6 +205,83 @@ void testEmptyPartition() throws Exception {
         assertThat(new File(emptyPartitionFile, "f1")).exists();
     }
 
+    @Test
+    public void testWriteSuccessFile() throws Exception {
+        Configuration flinkConfig = new Configuration();
+        flinkConfig.setBoolean(
+                
FileSystemConnectorOptions.HIVE_SINK_PARTITION_COMMIT_SUCCESS_FILE_ENABLED, 
true);
+        String successFileName =
+                
flinkConfig.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME);
+
+        // add new empty partition
+        LinkedHashMap<String, String> staticPartitions = new 
LinkedHashMap<String, String>();
+        staticPartitions.put("dt", "2022-08-05");
+        FileSystemCommitter committer =
+                new FileSystemCommitter(
+                        fileSystemFactory,
+                        metaStoreFactory,
+                        true,
+                        new Path(path.toString()),
+                        1,
+                        false,
+                        staticPartitions,
+                        flinkConfig);
+
+        createFile(path, "task-1/dt=2022-08-02/");
+        createFile(path, "task-2/dt=2022-08-02/");
+
+        committer.commitPartitions();
+
+        File emptyPartitionFile = new File(outputPath.toFile(), 
"dt=2022-08-02");
+        assertThat(emptyPartitionFile).exists();
+        assertThat(emptyPartitionFile).isDirectory();
+        assertThat(emptyPartitionFile).isNotEmptyDirectory();
+        assertThat(new File(emptyPartitionFile, successFileName)).exists();
+
+        // test not partition table
+        committer =
+                new FileSystemCommitter(
+                        fileSystemFactory,
+                        metaStoreFactory,
+                        true,
+                        new Path(path.toString()),
+                        0,
+                        false,
+                        new LinkedHashMap<String, String>(),
+                        flinkConfig);
+
+        createFile(path, "task-1/", "f1", "f2");
+        createFile(path, "task-2/", "f3");
+        committer.commitPartitions();
+        assertThat(new File(outputPath.toFile(), "f1")).exists();
+        assertThat(new File(outputPath.toFile(), "f2")).exists();
+        assertThat(new File(outputPath.toFile(), "f3")).exists();
+        assertThat(new File(outputPath.toFile(), successFileName)).exists();
+
+        // test dynamic partition

Review Comment:
   // test commit multiple partitions ?



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java:
##########
@@ -245,6 +245,13 @@ public class FileSystemConnectorOptions {
                     .withDescription(
                             "The compaction target file size, the default 
value is the rolling file size.");
 
+    public static final ConfigOption<Boolean> 
HIVE_SINK_PARTITION_COMMIT_SUCCESS_FILE_ENABLED =

Review Comment:
   It's a hive specifc configuration, should be moved to HiveOptions



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java:
##########
@@ -245,6 +245,13 @@ public class FileSystemConnectorOptions {
                     .withDescription(
                             "The compaction target file size, the default 
value is the rolling file size.");
 
+    public static final ConfigOption<Boolean> 
HIVE_SINK_PARTITION_COMMIT_SUCCESS_FILE_ENABLED =
+            key("hive.sink.partition-commit.success-file.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to create '_success' file in the 
partition or table directory when using Hive Batch Sink.");

Review Comment:
   ```suggestion
                               "Whether to create success-file in the partition 
or table directory when using Hive batch sink.");
   ```



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java:
##########
@@ -154,8 +169,69 @@ private void moveFiles(List<Path> srcDirs, Path destDir) 
throws Exception {
         }
     }
 
+    private void createSuccessFile(LinkedHashMap<String, String> 
partitionSpec, Path path)
+            throws Exception {
+        if (writeSuccessFileEnabled) {
+            PartitionCommitPolicy.Context context =
+                    new CommitPolicyContextImpl(partitionSpec, path);
+            commitPolicy.commit(context);
+        }
+    }
+
     @Override
     public void close() throws IOException {
         metaStore.close();
     }
+
+    private class CommitPolicyContextImpl implements 
PartitionCommitPolicy.Context {

Review Comment:
   Add java doc for this class, and explain it's only for write success file  
currently, so that only the method  `partitionPath`, `partitionSpec` is needed.



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java:
##########
@@ -245,6 +245,13 @@ public class FileSystemConnectorOptions {
                     .withDescription(
                             "The compaction target file size, the default 
value is the rolling file size.");
 
+    public static final ConfigOption<Boolean> 
HIVE_SINK_PARTITION_COMMIT_SUCCESS_FILE_ENABLED =

Review Comment:
   And I think we also should add a new option like 
`Hive_SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME` in HiveOptions



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java:
##########
@@ -102,22 +112,27 @@ public void loadNonPartition(List<Path> srcDirs) throws 
Exception {
     public void loadEmptyPartition(LinkedHashMap<String, String> partSpec) 
throws Exception {
         Optional<Path> pathFromMeta = metaStore.getPartition(partSpec);
         if (pathFromMeta.isPresent() && !overwrite) {
+            createSuccessFile(partSpec, pathFromMeta.get());
             return;
         }
         Path path = new Path(metaStore.getLocationPath(), 
generatePartitionPath(partSpec));
         if (pathFromMeta.isPresent()) {
             fs.delete(pathFromMeta.get(), true);
             fs.mkdirs(path);
         }
+        createSuccessFile(partSpec, path);

Review Comment:
   Call this method in `loadPartition` and `loadNonPartition` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to