Small correction, in T4 and T5 I mean Job2, not Job 1 (as job 1 was save pointed).
Thank you, Alex > On Jun 4, 2021, at 3:07 PM, Alexander Filipchik <[email protected]> wrote: > > > Looked through the logs and didn't see anything fishy that indicated an > exception during checkpointing. > To make it clearer, here is the timeline (we use unaligned checkpoints, and > state size around 300Gb): > > T1: Job1 was running > T2: Job1 was savepointed, brought down and replaced with Job2. > T3: Attempts to savepoint Job2 failed (timed out). Job2 was cancelled, > brought down and replaced by Job3 that was restored from extarnilized > checkpoint of Job2 > T3: Attempts to savepoint Job3 failed (timed out). Job3 was cancelled, > brought down and replaced by Job4 that was restored from extarnilized > checkpoint of Job3 > T4: We realized that jobs were timing out to savepoint due to local disk > throttling. We provisioned disk with more throughput and IO. Job4 was > cancelled, Job4 was deployed and restored from externilized checkpoint of > Job3, but failed as it couldn't find some files in the folder that belongs to > the checkpoint of Job1 > T5: We tried to redeploy and restore from checkpoints of Job3 and Job2, but > all the attempts failed on reading files from the folder that belongs to the > checkpoint of Job1 > > We checked the content of the folder containing checkpoints of Job1, and it > has files. Not sure what is pointing tho missing files and what could've > removed them. > > Any way we can figure out what could've happened? Is there a tool that can > read the checkpoint and check whether it is valid? > > Alex > >> On Thu, Jun 3, 2021 at 2:12 PM Alexander Filipchik <[email protected]> >> wrote: >> On the checkpoints -> what kind of issues should I check for? I was looking >> for metrics and it looks like they were reporting successful checkpoints. It >> looks like some files were removed in the shared folder, but I'm not sure >> how to check for what caused it. >> >> Savepoints were failing due to savepoint timeout timeout. Based on metrics, >> our attached disks were not fast enough (GCS regional disks are network >> disks and were throttled). The team cancelled the savepoint and just killed >> the kubernetes cluster. I assume some checkpoints were interrupted as the >> job triggers them one after another. >> >> Is there a known issue with termination during running checkpoint? >> >> Btw, we use the Flink Kube operator from Lyft. >> >> Alex >> >> On Thu, Jun 3, 2021 at 1:24 AM Chesnay Schepler <[email protected]> wrote: >>> Is there anything in the Flink logs indicating issues with writing the >>> checkpoint data? >>> When the savepoint could not be created, was anything logged from Flink? >>> How did you shut down the cluster? >>> >>> On 6/3/2021 5:56 AM, Alexander Filipchik wrote: >>>> Hi, >>>> >>>> Trying to figure out what happened with our Flink job. We use flink 1.11.1 >>>> and run a job with unaligned checkpoints and Rocks Db backend. The whole >>>> state is around 300Gb judging by the size of savepoints. >>>> >>>> The job ran ok. At some point we tried to deploy new code, but we couldn't >>>> take a save point as they were timing out. It looks like the reason it was >>>> timing out was due to disk throttle (we use google regional disks). >>>> The new code was deployed using an externalized checkpoint, but it didn't >>>> start as job was failing with: >>>> >>>> Caused by: java.io.FileNotFoundException: Item not found: >>>> 'gs://../app/checkpoints/2834fa1c81dcf7c9578a8be9a371b0d1/shared/3477b236-fb4b-4a0d-be73-cb6fac62c007'. >>>> Note, it is possible that the live version is still available but the >>>> requested generation is deleted. >>>> at >>>> com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:45) >>>> at >>>> com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:653) >>>> at >>>> com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:277) >>>> at >>>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(GoogleHadoopFSInputStream.java:78) >>>> at >>>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:620) >>>> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950) >>>> at >>>> com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem.java:120) >>>> at >>>> com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem.java:37) >>>> at >>>> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127) >>>> at >>>> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85) >>>> at >>>> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69) >>>> at >>>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:126) >>>> at >>>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109) >>>> at >>>> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50) >>>> at >>>> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) >>>> at >>>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) >>>> at >>>> java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1654) >>>> at >>>> java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1871) >>>> at >>>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83) >>>> at >>>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:66) >>>> at >>>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230) >>>> at >>>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195) >>>> at >>>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169) >>>> at >>>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:155) >>>> at >>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270) >>>> ... 15 more >>>> >>>> We tried to roll back the code, we tried different checkpoints, but all >>>> the attempts failed with the same error. The job ID in the error is not >>>> from the same checkpoint path, it looks like restore logic >>>> looks back at previous jobs, as all the checkpoints after >>>> 2834fa1c81dcf7c9578a8be9a371b0d1 are failing to restore with the same >>>> error. >>>> We looked at different checkpoints and found that some of them miss >>>> metadata file and can't be used for restoration. >>>> We also use ZK for HA, and we cleaned up the state there between >>>> deployments to make sure the non existent file >>>> is not coming from there. >>>> >>>> We decided to drop the state as we have means to repopulate it, but it >>>> would be great to get to the bottom of it. Any help will be appreciated. >>>> >>>> Alex >>>
