luoyuxia commented on code in PR #20469: URL: https://github.com/apache/flink/pull/20469#discussion_r942268552
########## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java: ########## @@ -469,7 +469,10 @@ public void testInsertDirectory() throws Exception { .await(); java.nio.file.Path[] files = FileUtils.listFilesInDirectory( - Paths.get(dataDir), (path) -> !path.toFile().isHidden()) + Paths.get(dataDir), + (path) -> + !path.toFile().isHidden() + && !path.toFile().getName().startsWith("_")) Review Comment: Use `FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME.defaultValue()` to compare? ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java: ########## @@ -337,6 +340,12 @@ private DataStreamSink<Row> createBatchSink( builder.setTempPath( new org.apache.flink.core.fs.Path(toStagingDir(stagingParentDir, jobConf))); builder.setOutputFileConfig(fileNaming); + builder.setIdentifier(identifier); + builder.setPolicyKind(conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND)); Review Comment: Do we need all these method `setPolicyKind`, `setCustomClass`, `setSuccessFileName` for the buider? What if we need to pass a new configuration to this builder in the future? I think we can use a single method `setTableOption`, and get the value we need when building the FileSystemOutputFormat so that it's more flexible. ########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java: ########## @@ -173,7 +206,14 @@ void testEmptyPartition() throws Exception { assertThat(emptyPartitionFile).exists(); assertThat(emptyPartitionFile).isDirectory(); - assertThat(emptyPartitionFile).isEmptyDirectory(); + files = + FileUtils.listFilesInDirectory( + Paths.get(emptyPartitionFile.getPath()), + (path) -> + !path.toFile().isHidden() + && !path.toFile().getName().startsWith("_")) Review Comment: use `!path.toFile().getName().equals("_SUCCESS")`? Also, it'll be better to make `_SUCCESS` a static constant if we use it in some test methods. ########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java: ########## @@ -198,6 +240,81 @@ void testEmptyPartition() throws Exception { assertThat(new File(emptyPartitionFile, "f1")).exists(); } + @Test + public void testWriteSuccessFile() throws Exception { Review Comment: Do we need this test. Can we check successfile in method `testEmptyPartition`, `testPartition`, `testNotPartition` ? ########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java: ########## @@ -40,12 +44,24 @@ class FileSystemCommitterTest { private FileSystemFactory fileSystemFactory = FileSystem::get; private TableMetaStoreFactory metaStoreFactory; + private List<PartitionCommitPolicy> policies; + private ObjectIdentifier identifier; @TempDir private java.nio.file.Path outputPath; @TempDir private java.nio.file.Path path; @BeforeEach public void before() throws IOException { metaStoreFactory = new TestMetaStoreFactory(new Path(outputPath.toString())); + policies = + PartitionCommitPolicy.createPolicyChain( + Thread.currentThread().getContextClassLoader(), + "metastore,success-file", + null, + "_SUCCESS", + () -> { + return new LocalFileSystem(); Review Comment: Can be `LocalFileSystem::new` ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java: ########## @@ -154,8 +166,82 @@ private void moveFiles(List<Path> srcDirs, Path destDir) throws Exception { } } + /** + * Reuse of PartitionCommitPolicy mechanisms. The default in Batch mode is metastore and Review Comment: The comment is not correct. For filesystem, the default is nothing. I think we can just explain what this method do. ########## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java: ########## @@ -496,6 +496,48 @@ public void testWritingNoDataToPartition() throws Exception { assertBatch("target_table", Arrays.asList()); } + @Test + public void testWriteSuccessFile() throws Exception { Review Comment: Also, could you please add tests to verify the following two cases: 1: disable wrtiting success file 2: change success file name ########## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java: ########## @@ -496,6 +496,48 @@ public void testWritingNoDataToPartition() throws Exception { assertBatch("target_table", Arrays.asList()); } + @Test + public void testWriteSuccessFile() throws Exception { + TableEnvironment tEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE); + tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); + tEnv.useCatalog(hiveCatalog.getName()); + + String successFileName = tEnv.getConfig().get(SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME); + + tEnv.executeSql("CREATE TABLE zm_test_non_partition_table (name string)"); + tEnv.executeSql( + "CREATE TABLE zm_test_partition_table (name string) PARTITIONED BY (`dt` string)"); + + // test partition table + String partitionTablePath = Review Comment: To get the path of the partitioned table, you can use ```java String warehouse = hiveCatalog.getHiveConf().get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname); String partitionTablePath = warehouse + "/zm_test_non_partition_table"; ``` ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java: ########## @@ -154,8 +166,82 @@ private void moveFiles(List<Path> srcDirs, Path destDir) throws Exception { } } + /** + * Reuse of PartitionCommitPolicy mechanisms. The default in Batch mode is metastore and + * success-file. + */ + private void commitPartition(LinkedHashMap<String, String> partitionSpec, Path path) + throws Exception { + PartitionCommitPolicy.Context context = new CommitPolicyContextImpl(partitionSpec, path); + for (PartitionCommitPolicy policy : policies) { + if (policy instanceof MetastoreCommitPolicy) { + if (partitionSpec.isEmpty()) { + // Non partition table skip commit meta data. + continue; + } + ((MetastoreCommitPolicy) policy).setMetastore(metaStore); + } + policy.commit(context); + } + } + @Override public void close() throws IOException { metaStore.close(); } + + /** + * Reuse the PartitionCommitPolicy in Hive batch sink. We only need partitionPath and Review Comment: The comment is a little of confused. I think the comment isn't nesscessay since it's quite simple and the interface `PartitionCommitPolicy.Context` has done some explanation. ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java: ########## @@ -154,8 +166,82 @@ private void moveFiles(List<Path> srcDirs, Path destDir) throws Exception { } } + /** + * Reuse of PartitionCommitPolicy mechanisms. The default in Batch mode is metastore and + * success-file. + */ + private void commitPartition(LinkedHashMap<String, String> partitionSpec, Path path) + throws Exception { + PartitionCommitPolicy.Context context = new CommitPolicyContextImpl(partitionSpec, path); + for (PartitionCommitPolicy policy : policies) { + if (policy instanceof MetastoreCommitPolicy) { + if (partitionSpec.isEmpty()) { + // Non partition table skip commit meta data. + continue; + } + ((MetastoreCommitPolicy) policy).setMetastore(metaStore); + } + policy.commit(context); + } + } + @Override public void close() throws IOException { metaStore.close(); } + + /** + * Reuse the PartitionCommitPolicy in Hive batch sink. We only need partitionPath and Review Comment: The word `Hive` shouldn't appear in filesystem connector. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org