XComp commented on code in PR #24390: URL: https://github.com/apache/flink/pull/24390#discussion_r1511414346
########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/utils/PathUtils.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.table.utils; + +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.UUID; + +/** Path utils for file system. */ +public class PathUtils { + + public static Path getStagingPath(Path path) { + // Add a random UUID to prevent multiple sinks from sharing the same staging dir. + // Please see FLINK-29114 for more details + Path stagingDir = + new Path( + path, + String.join( Review Comment: I'm not sure whether that's actually what you want. The resulting String would be something like `.staging__1709568597254_2cf73aab-39a4-440a-b81c-216be9635bb8` (i.e. double underscore). ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/utils/PathUtils.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.table.utils; + +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.UUID; + +/** Path utils for file system. */ +public class PathUtils { + + public static Path getStagingPath(Path path) { Review Comment: ```suggestion public static Path getStagingPath(Path path) { return getStagingPath( path, () -> String.format( ".staging_%d_%s", System.currentTimeMillis(), UUID.randomUUID())); } @VisibleForTesting static Path getStagingPath(Path path, Supplier<String> suffixSupplier) { ``` The current test is "theoretically" flaky. We change that by using a callback and checking for the Precondition. That would enable us to create the following test: ```java @Test void testReusingStagingDirFails(@TempDir Path tmpDir) throws IOException { final String subfolderName = "directory-name"; Files.createDirectory(tmpDir.resolve(subfolderName)); assertThatThrownBy( () -> PathUtils.getStagingPath( org.apache.flink.core.fs.Path.fromLocalFile( tmpDir.toFile()), () -> subfolderName)) .isEqualTo(IllegalStateException.class); } ``` And that would reveal that our assumption was actually wrong that `mkdirs` returns `false` if the directory already exists. :-/ ########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/utils/PathUtilsTest.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.table.utils; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.file.table.FileSystemConnectorOptions; +import org.apache.flink.core.fs.Path; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +/** Test for {@link PathUtils}. */ +public class PathUtilsTest { + + @Test + void testUniqueStagingDirectory(@TempDir File tmpDir) { + final Configuration config = new Configuration(); + config.set(FileSystemConnectorOptions.PATH, tmpDir.getAbsolutePath()); + assertThat(tmpDir.listFiles()).isEmpty(); Review Comment: ```suggestion ``` we wouldn't need this bit anymore, would we? ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/utils/PathUtils.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.table.utils; + +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.UUID; + +/** Path utils for file system. */ +public class PathUtils { + + public static Path getStagingPath(Path path) { + // Add a random UUID to prevent multiple sinks from sharing the same staging dir. + // Please see FLINK-29114 for more details + Path stagingDir = + new Path( + path, + String.join( + "_", + ".staging_", + String.valueOf(System.currentTimeMillis()), + UUID.randomUUID().toString())); + try { + FileSystem fs = stagingDir.getFileSystem(); + Preconditions.checkState( + fs.mkdirs(stagingDir), "Failed to create staging dir " + stagingDir); Review Comment: ```suggestion Preconditions.checkState( !fs.exists(stagingDir), "Failed to create staging dir %s", stagingDir); fs.mkdirs(stagingDir); ``` Looks like that is what we want here. I'm a bit hesitant here because checking for non-existence can be expensive in object storage filesystems like S3 with big buckets. :thinking: But to be fair, it is not a regression because the previous code did the same. ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/utils/PathUtils.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.table.utils; + +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.UUID; + +/** Path utils for file system. */ +public class PathUtils { Review Comment: `HiveTableSink#createBatchCompactSink` uses the random staging directory for the [BatchFileWriter instantiation in [line 490ff](https://github.com/apache/flink/blob/06caf191991b944769cb7b02769b5d769064febf/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java#L492). The `BatchFileWriter` instantiates a `PartitionTempFileManager` in the operator's open method ([BatchFileWriter:85](https://github.com/apache/flink/blob/0f3470db83c1fddba9ac9a7299b1e61baab4ff12/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java#L85)). The `PartitionTempFileManager` creates a subfolder for the attempt and deletes any existing directory to ensure that no side effects occur. That said, the two code locations seem to be independent from each other. Moving the staging directory into the OutputFormat still sounds like a reasonable thing to do. But that's your call in the end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org