XComp commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1511414346


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/utils/PathUtils.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.utils;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/** Path utils for file system. */
+public class PathUtils {
+
+    public static Path getStagingPath(Path path) {
+        // Add a random UUID to prevent multiple sinks from sharing the same 
staging dir.
+        // Please see FLINK-29114 for more details
+        Path stagingDir =
+                new Path(
+                        path,
+                        String.join(

Review Comment:
   I'm not sure whether that's actually what you want. The resulting String 
would be something like 
`.staging__1709568597254_2cf73aab-39a4-440a-b81c-216be9635bb8` (i.e. double 
underscore).



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/utils/PathUtils.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.utils;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/** Path utils for file system. */
+public class PathUtils {
+
+    public static Path getStagingPath(Path path) {

Review Comment:
   ```suggestion
       
   public static Path getStagingPath(Path path) {
           return getStagingPath(
                   path,
                   () ->
                           String.format(
                                   ".staging_%d_%s", 
System.currentTimeMillis(), UUID.randomUUID()));
       }
   
       @VisibleForTesting
       static Path getStagingPath(Path path, Supplier<String> suffixSupplier) {
   ```
   The current test is "theoretically" flaky. We change that by using a 
callback and checking for the Precondition. That would enable us to create the 
following test:
   
   ```java
       @Test
       void testReusingStagingDirFails(@TempDir Path tmpDir) throws IOException 
{
           final String subfolderName = "directory-name";
           Files.createDirectory(tmpDir.resolve(subfolderName));
           assertThatThrownBy(
                           () ->
                                   PathUtils.getStagingPath(
                                           
org.apache.flink.core.fs.Path.fromLocalFile(
                                                   tmpDir.toFile()),
                                           () -> subfolderName))
                   .isEqualTo(IllegalStateException.class);
       }
   ```
   And that would reveal that our assumption was actually wrong that `mkdirs` 
returns `false` if the directory already exists. :-/



##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/utils/PathUtilsTest.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+/** Test for {@link PathUtils}. */
+public class PathUtilsTest {
+
+    @Test
+    void testUniqueStagingDirectory(@TempDir File tmpDir) {
+        final Configuration config = new Configuration();
+        config.set(FileSystemConnectorOptions.PATH, tmpDir.getAbsolutePath());
+        assertThat(tmpDir.listFiles()).isEmpty();

Review Comment:
   ```suggestion
   ```
   we wouldn't need this bit anymore, would we?



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/utils/PathUtils.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.utils;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/** Path utils for file system. */
+public class PathUtils {
+
+    public static Path getStagingPath(Path path) {
+        // Add a random UUID to prevent multiple sinks from sharing the same 
staging dir.
+        // Please see FLINK-29114 for more details
+        Path stagingDir =
+                new Path(
+                        path,
+                        String.join(
+                                "_",
+                                ".staging_",
+                                String.valueOf(System.currentTimeMillis()),
+                                UUID.randomUUID().toString()));
+        try {
+            FileSystem fs = stagingDir.getFileSystem();
+            Preconditions.checkState(
+                    fs.mkdirs(stagingDir), "Failed to create staging dir " + 
stagingDir);

Review Comment:
   ```suggestion
    Preconditions.checkState(
                       !fs.exists(stagingDir), "Failed to create staging dir 
%s", stagingDir);
               fs.mkdirs(stagingDir);
   ```
   Looks like that is what we want here. I'm a bit hesitant here because 
checking for non-existence can be expensive in object storage filesystems like 
S3 with big buckets. :thinking: But to be fair, it is not a regression because 
the previous code did the same.



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/utils/PathUtils.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.utils;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/** Path utils for file system. */
+public class PathUtils {

Review Comment:
   `HiveTableSink#createBatchCompactSink` uses the random staging directory for 
the [BatchFileWriter instantiation in [line 
490ff](https://github.com/apache/flink/blob/06caf191991b944769cb7b02769b5d769064febf/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java#L492).
 The `BatchFileWriter` instantiates a `PartitionTempFileManager` in the 
operator's open method 
([BatchFileWriter:85](https://github.com/apache/flink/blob/0f3470db83c1fddba9ac9a7299b1e61baab4ff12/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java#L85)).
 The `PartitionTempFileManager` creates a subfolder for the attempt and deletes 
any existing directory to ensure that no side effects occur.
   
   That said, the two code locations seem to be independent from each other. 
Moving the staging directory into the OutputFormat still sounds like a 
reasonable thing to do. But that's your call in the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to