luoyuxia commented on code in PR #20469:
URL: https://github.com/apache/flink/pull/20469#discussion_r942268552


##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java:
##########
@@ -469,7 +469,10 @@ public void testInsertDirectory() throws Exception {
                 .await();
         java.nio.file.Path[] files =
                 FileUtils.listFilesInDirectory(
-                                Paths.get(dataDir), (path) -> 
!path.toFile().isHidden())
+                                Paths.get(dataDir),
+                                (path) ->
+                                        !path.toFile().isHidden()
+                                                && 
!path.toFile().getName().startsWith("_"))

Review Comment:
   Use 
`FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME.defaultValue()`
 to compare?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java:
##########
@@ -337,6 +340,12 @@ private DataStreamSink<Row> createBatchSink(
         builder.setTempPath(
                 new 
org.apache.flink.core.fs.Path(toStagingDir(stagingParentDir, jobConf)));
         builder.setOutputFileConfig(fileNaming);
+        builder.setIdentifier(identifier);
+        
builder.setPolicyKind(conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND));

Review Comment:
   Do we need all these method `setPolicyKind`, `setCustomClass`, 
`setSuccessFileName` for the buider? What if we need to pass a new 
configuration to this builder in the future?
   I think we can use a single method `setTableOption`, and get the value we 
need when building the FileSystemOutputFormat so that it's more flexible.



##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java:
##########
@@ -173,7 +206,14 @@ void testEmptyPartition() throws Exception {
 
         assertThat(emptyPartitionFile).exists();
         assertThat(emptyPartitionFile).isDirectory();
-        assertThat(emptyPartitionFile).isEmptyDirectory();
+        files =
+                FileUtils.listFilesInDirectory(
+                                Paths.get(emptyPartitionFile.getPath()),
+                                (path) ->
+                                        !path.toFile().isHidden()
+                                                && 
!path.toFile().getName().startsWith("_"))

Review Comment:
   use 
   `!path.toFile().getName().equals("_SUCCESS")`?
   Also, it'll be better to make `_SUCCESS` a static constant if we use it in 
some test methods.



##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java:
##########
@@ -198,6 +240,81 @@ void testEmptyPartition() throws Exception {
         assertThat(new File(emptyPartitionFile, "f1")).exists();
     }
 
+    @Test
+    public void testWriteSuccessFile() throws Exception {

Review Comment:
   Do we need this test. Can we check successfile in method 
`testEmptyPartition`, `testPartition`, `testNotPartition` ?



##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java:
##########
@@ -40,12 +44,24 @@ class FileSystemCommitterTest {
     private FileSystemFactory fileSystemFactory = FileSystem::get;
 
     private TableMetaStoreFactory metaStoreFactory;
+    private List<PartitionCommitPolicy> policies;
+    private ObjectIdentifier identifier;
     @TempDir private java.nio.file.Path outputPath;
     @TempDir private java.nio.file.Path path;
 
     @BeforeEach
     public void before() throws IOException {
         metaStoreFactory = new TestMetaStoreFactory(new 
Path(outputPath.toString()));
+        policies =
+                PartitionCommitPolicy.createPolicyChain(
+                        Thread.currentThread().getContextClassLoader(),
+                        "metastore,success-file",
+                        null,
+                        "_SUCCESS",
+                        () -> {
+                            return new LocalFileSystem();

Review Comment:
   Can be `LocalFileSystem::new`



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java:
##########
@@ -154,8 +166,82 @@ private void moveFiles(List<Path> srcDirs, Path destDir) 
throws Exception {
         }
     }
 
+    /**
+     * Reuse of PartitionCommitPolicy mechanisms. The default in Batch mode is 
metastore and

Review Comment:
   The comment is not correct. For filesystem, the default is nothing.
   I think we can just explain what this method do. 



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java:
##########
@@ -496,6 +496,48 @@ public void testWritingNoDataToPartition() throws 
Exception {
         assertBatch("target_table", Arrays.asList());
     }
 
+    @Test
+    public void testWriteSuccessFile() throws Exception {

Review Comment:
   Also, could you please add tests to verify the following two cases:
   1: disable wrtiting success file 
   2: change success file name



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java:
##########
@@ -496,6 +496,48 @@ public void testWritingNoDataToPartition() throws 
Exception {
         assertBatch("target_table", Arrays.asList());
     }
 
+    @Test
+    public void testWriteSuccessFile() throws Exception {
+        TableEnvironment tEnv = 
HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
+        tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        tEnv.useCatalog(hiveCatalog.getName());
+
+        String successFileName = 
tEnv.getConfig().get(SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME);
+
+        tEnv.executeSql("CREATE TABLE zm_test_non_partition_table (name 
string)");
+        tEnv.executeSql(
+                "CREATE TABLE zm_test_partition_table (name string) 
PARTITIONED BY (`dt` string)");
+
+        // test partition table
+        String partitionTablePath =

Review Comment:
   To get the path of the partitioned table, you can use 
   ```java
    String warehouse = 
hiveCatalog.getHiveConf().get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
    String partitionTablePath = warehouse + "/zm_test_non_partition_table";
   ```



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java:
##########
@@ -154,8 +166,82 @@ private void moveFiles(List<Path> srcDirs, Path destDir) 
throws Exception {
         }
     }
 
+    /**
+     * Reuse of PartitionCommitPolicy mechanisms. The default in Batch mode is 
metastore and
+     * success-file.
+     */
+    private void commitPartition(LinkedHashMap<String, String> partitionSpec, 
Path path)
+            throws Exception {
+        PartitionCommitPolicy.Context context = new 
CommitPolicyContextImpl(partitionSpec, path);
+        for (PartitionCommitPolicy policy : policies) {
+            if (policy instanceof MetastoreCommitPolicy) {
+                if (partitionSpec.isEmpty()) {
+                    // Non partition table skip commit meta data.
+                    continue;
+                }
+                ((MetastoreCommitPolicy) policy).setMetastore(metaStore);
+            }
+            policy.commit(context);
+        }
+    }
+
     @Override
     public void close() throws IOException {
         metaStore.close();
     }
+
+    /**
+     * Reuse the PartitionCommitPolicy in Hive batch sink. We only need 
partitionPath and

Review Comment:
   The comment is a little of confused.  I think the comment isn't nesscessay 
since it's quite simple and the interface `PartitionCommitPolicy.Context` has 
done some explanation.



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java:
##########
@@ -154,8 +166,82 @@ private void moveFiles(List<Path> srcDirs, Path destDir) 
throws Exception {
         }
     }
 
+    /**
+     * Reuse of PartitionCommitPolicy mechanisms. The default in Batch mode is 
metastore and
+     * success-file.
+     */
+    private void commitPartition(LinkedHashMap<String, String> partitionSpec, 
Path path)
+            throws Exception {
+        PartitionCommitPolicy.Context context = new 
CommitPolicyContextImpl(partitionSpec, path);
+        for (PartitionCommitPolicy policy : policies) {
+            if (policy instanceof MetastoreCommitPolicy) {
+                if (partitionSpec.isEmpty()) {
+                    // Non partition table skip commit meta data.
+                    continue;
+                }
+                ((MetastoreCommitPolicy) policy).setMetastore(metaStore);
+            }
+            policy.commit(context);
+        }
+    }
+
     @Override
     public void close() throws IOException {
         metaStore.close();
     }
+
+    /**
+     * Reuse the PartitionCommitPolicy in Hive batch sink. We only need 
partitionPath and

Review Comment:
   The word `Hive` shouldn't appear in filesystem connector.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to