Alexis Sarda-Espinosa created FLINK-36190:
---------------------------------------------

             Summary: Completed checkpoint's file cannot be discarded due to 
network issues
                 Key: FLINK-36190
                 URL: https://issues.apache.org/jira/browse/FLINK-36190
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Checkpointing
    Affects Versions: 1.18.1
            Reporter: Alexis Sarda-Espinosa


I have a job that uses RocksDB with incremental checkpointing, where the target 
is Azure Blob File System. Due to network issues, the Hadoop stack is having 
problems storing checkpoint files. This, of course, is not a problem in Flink 
itself, but I have encountered a scenario that apparently left Flink's handles 
inconsistent.

First, I can see the following logs in the Task Manager:
{noformat}
2024-08-30 17:53:14,952 DEBUG [AsyncOperations-thread-25] 
org.apache.hadoop.fs.azurebfs.services.AbfsClient:311 - HttpRequestFailure: 
0,...,PUT,https://.../checkpoints/035f85c3de3190faa0b129a2def22822/chk-148/6fa8b251-bb61-4a19-b5ae-93c1a885b5a7?resource=file&timeout=90,
 java.net.SocketException: Connection reset
2024-08-30 17:53:14,952 DEBUG [AsyncOperations-thread-25] 
org.apache.hadoop.fs.azurebfs.services.AbfsClient:221 - Retrying REST operation 
CreatePath. RetryCount = 1
2024-08-30 17:53:20,493 DEBUG [AsyncOperations-thread-25] 
org.apache.hadoop.fs.azurebfs.services.AbfsClient:323 - HttpRequest: 
CreatePath: 
409,PathAlreadyExists,...,PUT,https://.../checkpoints/035f85c3de3190faa0b129a2def22822/chk-148/6fa8b251-bb61-4a19-b5ae-93c1a885b5a7?resource=file&timeout=90{noformat}
As far as I can tell based on the Hadoop's source code, this 
{{PathAlreadyExists}} error will not be retried nor handled by them, so I guess 
it must be handled somewhere in Flink's stack. The issue is that the Job 
Manager later logs this:
{noformat}
2024-08-30 17:54:23,105 WARN  [cluster-io-thread-8] 
org.apache.flink.runtime.checkpoint.CheckpointsCleaner:141 - Could not properly 
discard completed checkpoint 
148.org.apache.hadoop.fs.FileAlreadyExistsException: Operation failed: "The 
recursive query parameter value must be true to delete a non-empty directory.", 
409, DELETE, 
https://.../checkpoints/035f85c3de3190faa0b129a2def22822/chk-148?timeout=90&recursive=false,
 DirectoryNotEmpty, "The recursive query parameter value must be true to delete 
a non-empty directory. RequestId:4767d0f8-c01f-001d-3905-fba443000000 
Time:2024-08-30T17:54:23.0931772Z"    at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.checkException(AzureBlobFileSystem.java:1380)
 ~[?:?]    at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.delete(AzureBlobFileSystem.java:473)
 ~[?:?]    at 
org.apache.flink.fs.azure.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:160)
 ~[?:?]    at 
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:155)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:74)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
org.apache.flink.runtime.checkpoint.CompletedCheckpoint$CompletedCheckpointDiscardObject.discard(CompletedCheckpoint.java:358)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanup$1(CheckpointsCleaner.java:139)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
[?:?]    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
[?:?]    at java.base/java.lang.Thread.run(Unknown Source) [?:?]Caused by: 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException: 
Operation failed: "The recursive query parameter value must be true to delete a 
non-empty directory.", 409, DELETE, 
https://.../checkpoints/035f85c3de3190faa0b129a2def22822/chk-148?timeout=90&recursive=false,
 DirectoryNotEmpty, "The recursive query parameter value must be true to delete 
a non-empty directory. RequestId:4767d0f8-c01f-001d-3905-fba443000000 
Time:2024-08-30T17:54:23.0931772Z"    at 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.completeExecute(AbfsRestOperation.java:231)
 ~[?:?]    at 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.lambda$execute$0(AbfsRestOperation.java:191)
 ~[?:?]    at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation(IOStatisticsBinding.java:464)
 ~[?:?]    at 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:189)
 ~[?:?]    at 
org.apache.hadoop.fs.azurebfs.services.AbfsClient.deletePath(AbfsClient.java:742)
 ~[?:?]    at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.delete(AzureBlobFileSystemStore.java:915)
 ~[?:?]    at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.delete(AzureBlobFileSystem.java:470)
 ~[?:?]    ... 8 more{noformat}
And indeed, if I look at that path in the blob storage, I can see a the 
6fa8b251-bb61-4a19-b5ae-93c1a885b5a7 file is there, which prevents the deletion 
of the folder.

I also find it odd that the file's size is 0 bytes, yet the checkpoint was 
marked as completed. Is it normal that some checkpoint files are empty 
sometimes?

Side note: there are multiple Hadoop classes using the same logger, the logs 
from the first block above are actually from
{noformat}
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to