Alexis Sarda-Espinosa created FLINK-36190: ---------------------------------------------
Summary: Completed checkpoint's file cannot be discarded due to network issues Key: FLINK-36190 URL: https://issues.apache.org/jira/browse/FLINK-36190 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.18.1 Reporter: Alexis Sarda-Espinosa I have a job that uses RocksDB with incremental checkpointing, where the target is Azure Blob File System. Due to network issues, the Hadoop stack is having problems storing checkpoint files. This, of course, is not a problem in Flink itself, but I have encountered a scenario that apparently left Flink's handles inconsistent. First, I can see the following logs in the Task Manager: {noformat} 2024-08-30 17:53:14,952 DEBUG [AsyncOperations-thread-25] org.apache.hadoop.fs.azurebfs.services.AbfsClient:311 - HttpRequestFailure: 0,...,PUT,https://.../checkpoints/035f85c3de3190faa0b129a2def22822/chk-148/6fa8b251-bb61-4a19-b5ae-93c1a885b5a7?resource=file&timeout=90, java.net.SocketException: Connection reset 2024-08-30 17:53:14,952 DEBUG [AsyncOperations-thread-25] org.apache.hadoop.fs.azurebfs.services.AbfsClient:221 - Retrying REST operation CreatePath. RetryCount = 1 2024-08-30 17:53:20,493 DEBUG [AsyncOperations-thread-25] org.apache.hadoop.fs.azurebfs.services.AbfsClient:323 - HttpRequest: CreatePath: 409,PathAlreadyExists,...,PUT,https://.../checkpoints/035f85c3de3190faa0b129a2def22822/chk-148/6fa8b251-bb61-4a19-b5ae-93c1a885b5a7?resource=file&timeout=90{noformat} As far as I can tell based on the Hadoop's source code, this {{PathAlreadyExists}} error will not be retried nor handled by them, so I guess it must be handled somewhere in Flink's stack. The issue is that the Job Manager later logs this: {noformat} 2024-08-30 17:54:23,105 WARN [cluster-io-thread-8] org.apache.flink.runtime.checkpoint.CheckpointsCleaner:141 - Could not properly discard completed checkpoint 148.org.apache.hadoop.fs.FileAlreadyExistsException: Operation failed: "The recursive query parameter value must be true to delete a non-empty directory.", 409, DELETE, https://.../checkpoints/035f85c3de3190faa0b129a2def22822/chk-148?timeout=90&recursive=false, DirectoryNotEmpty, "The recursive query parameter value must be true to delete a non-empty directory. RequestId:4767d0f8-c01f-001d-3905-fba443000000 Time:2024-08-30T17:54:23.0931772Z" at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.checkException(AzureBlobFileSystem.java:1380) ~[?:?] at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.delete(AzureBlobFileSystem.java:473) ~[?:?] at org.apache.flink.fs.azure.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:160) ~[?:?] at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:155) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:74) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.checkpoint.CompletedCheckpoint$CompletedCheckpointDiscardObject.discard(CompletedCheckpoint.java:358) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanup$1(CheckpointsCleaner.java:139) ~[flink-dist-1.18.1.jar:1.18.1] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.base/java.lang.Thread.run(Unknown Source) [?:?]Caused by: org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException: Operation failed: "The recursive query parameter value must be true to delete a non-empty directory.", 409, DELETE, https://.../checkpoints/035f85c3de3190faa0b129a2def22822/chk-148?timeout=90&recursive=false, DirectoryNotEmpty, "The recursive query parameter value must be true to delete a non-empty directory. RequestId:4767d0f8-c01f-001d-3905-fba443000000 Time:2024-08-30T17:54:23.0931772Z" at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.completeExecute(AbfsRestOperation.java:231) ~[?:?] at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.lambda$execute$0(AbfsRestOperation.java:191) ~[?:?] at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation(IOStatisticsBinding.java:464) ~[?:?] at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:189) ~[?:?] at org.apache.hadoop.fs.azurebfs.services.AbfsClient.deletePath(AbfsClient.java:742) ~[?:?] at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.delete(AzureBlobFileSystemStore.java:915) ~[?:?] at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.delete(AzureBlobFileSystem.java:470) ~[?:?] ... 8 more{noformat} And indeed, if I look at that path in the blob storage, I can see a the 6fa8b251-bb61-4a19-b5ae-93c1a885b5a7 file is there, which prevents the deletion of the folder. I also find it odd that the file's size is 0 bytes, yet the checkpoint was marked as completed. Is it normal that some checkpoint files are empty sometimes? Side note: there are multiple Hadoop classes using the same logger, the logs from the first block above are actually from {noformat} org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)