Hi Alex, A quick question. Are you using incremental checkpoints?
Best, Piotrek sob., 5 cze 2021 o 21:23 <afilipc...@gmail.com> napisał(a): > Small correction, in T4 and T5 I mean Job2, not Job 1 (as job 1 was save > pointed). > > Thank you, > Alex > > On Jun 4, 2021, at 3:07 PM, Alexander Filipchik <afilipc...@gmail.com> > wrote: > > > Looked through the logs and didn't see anything fishy that indicated an > exception during checkpointing. > To make it clearer, here is the timeline (we use unaligned checkpoints, > and state size around 300Gb): > > T1: Job1 was running > T2: Job1 was savepointed, brought down and replaced with Job2. > T3: Attempts to savepoint Job2 failed (timed out). Job2 was cancelled, > brought down and replaced by Job3 that was restored from extarnilized > checkpoint of Job2 > T3: Attempts to savepoint Job3 failed (timed out). Job3 was cancelled, > brought down and replaced by Job4 that was restored from extarnilized > checkpoint of Job3 > T4: We realized that jobs were timing out to savepoint due to local disk > throttling. We provisioned disk with more throughput and IO. Job4 was > cancelled, Job4 was deployed and restored from externilized checkpoint of > Job3, but failed as it couldn't find some files in the folder that belongs > to the checkpoint of *Job1* > T5: We tried to redeploy and restore from checkpoints of Job3 and Job2, > but all the attempts failed on reading files from the *folder that > belongs to the checkpoint of Job1* > > We checked the content of the folder containing checkpoints of Job1, and > it has files. Not sure what is pointing tho missing files and what could've > removed them. > > Any way we can figure out what could've happened? Is there a tool that can > read the checkpoint and check whether it is valid? > > Alex > > On Thu, Jun 3, 2021 at 2:12 PM Alexander Filipchik <afilipc...@gmail.com> > wrote: > >> On the checkpoints -> what kind of issues should I check for? I was >> looking for metrics and it looks like they were reporting successful >> checkpoints. It looks like some files were removed in the shared folder, >> but I'm not sure how to check for what caused it. >> >> Savepoints were failing due to savepoint timeout timeout. Based on >> metrics, our attached disks were not fast enough (GCS regional disks are >> network disks and were throttled). The team cancelled the savepoint and >> just killed the kubernetes cluster. I assume some checkpoints were >> interrupted as the job triggers them one after another. >> >> Is there a known issue with termination during running checkpoint? >> >> Btw, we use the Flink Kube operator from Lyft. >> >> Alex >> >> On Thu, Jun 3, 2021 at 1:24 AM Chesnay Schepler <ches...@apache.org> >> wrote: >> >>> Is there anything in the Flink logs indicating issues with writing the >>> checkpoint data? >>> When the savepoint could not be created, was anything logged from Flink? >>> How did you shut down the cluster? >>> >>> On 6/3/2021 5:56 AM, Alexander Filipchik wrote: >>> >>> Hi, >>> >>> Trying to figure out what happened with our Flink job. We use flink >>> 1.11.1 and run a job with unaligned checkpoints and Rocks Db backend. The >>> whole state is around 300Gb judging by the size of savepoints. >>> >>> The job ran ok. At some point we tried to deploy new code, but we >>> couldn't take a save point as they were timing out. It looks like the >>> reason it was timing out was due to disk throttle (we use google regional >>> disks). >>> The new code was deployed using an externalized checkpoint, but it >>> didn't start as job was failing with: >>> >>> Caused by: java.io.FileNotFoundException: Item not found: >>> 'gs://../app/checkpoints/2834fa1c81dcf7c9578a8be9a371b0d1/shared/3477b236-fb4b-4a0d-be73-cb6fac62c007'. >>> Note, it is possible that the live version is still available but the >>> requested generation is deleted. >>> at com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions >>> .createFileNotFoundException(GoogleCloudStorageExceptions.java:45) >>> at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open( >>> GoogleCloudStorageImpl.java:653) >>> at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open( >>> GoogleCloudStorageFileSystem.java:277) >>> at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>( >>> GoogleHadoopFSInputStream.java:78) >>> at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open( >>> GoogleHadoopFileSystemBase.java:620) >>> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950) >>> at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem >>> .java:120) >>> at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem >>> .java:37) >>> at org.apache.flink.core.fs. >>> PluginFileSystemFactory$ClassLoaderFixingFileSystem.open( >>> PluginFileSystemFactory.java:127) >>> at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open( >>> SafetyNetWrapperFileSystem.java:85) >>> at org.apache.flink.runtime.state.filesystem.FileStateHandle >>> .openInputStream(FileStateHandle.java:69) >>> at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader >>> .downloadDataForStateHandle(RocksDBStateDownloader.java:126) >>> at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader >>> .lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109) >>> at org.apache.flink.util.function.ThrowingRunnable >>> .lambda$unchecked$0(ThrowingRunnable.java:50) >>> at java.util.concurrent.CompletableFuture$AsyncRun.run( >>> CompletableFuture.java:1640) >>> at org.apache.flink.runtime.concurrent.DirectExecutorService >>> .execute(DirectExecutorService.java:211) >>> at java.util.concurrent.CompletableFuture.asyncRunStage( >>> CompletableFuture.java:1654) >>> at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture >>> .java:1871) >>> at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader >>> .downloadDataForAllStateHandles(RocksDBStateDownloader.java:83) >>> at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader >>> .transferAllStateDataToDirectory(RocksDBStateDownloader.java:66) >>> at org.apache.flink.contrib.streaming.state.restore. >>> RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory( >>> RocksDBIncrementalRestoreOperation.java:230) >>> at org.apache.flink.contrib.streaming.state.restore. >>> RocksDBIncrementalRestoreOperation.restoreFromRemoteState( >>> RocksDBIncrementalRestoreOperation.java:195) >>> at org.apache.flink.contrib.streaming.state.restore. >>> RocksDBIncrementalRestoreOperation.restoreWithoutRescaling( >>> RocksDBIncrementalRestoreOperation.java:169) >>> at org.apache.flink.contrib.streaming.state.restore. >>> RocksDBIncrementalRestoreOperation.restore( >>> RocksDBIncrementalRestoreOperation.java:155) >>> at org.apache.flink.contrib.streaming.state. >>> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder >>> .java:270) >>> ... 15 more >>> We tried to roll back the code, we tried different checkpoints, but all >>> the attempts failed with the same error. The job ID in the error is not >>> from the same checkpoint path, it looks like restore logic >>> looks back at previous jobs, as all the checkpoints after >>> 2834fa1c81dcf7c9578a8be9a371b0d1 are failing to restore with the same error. >>> We looked at different checkpoints and found that some of them miss >>> metadata file and can't be used for restoration. >>> We also use ZK for HA, and we cleaned up the state there between >>> deployments to make sure the non existent file >>> is not coming from there. >>> We decided to drop the state as we have means to repopulate it, but it >>> would be great to get to the bottom of it. Any help will be appreciated. >>> >>> Alex >>> >>> >>>