Bumping it up, any known way to catch it if it happens again ? Any logs we should enable?
Sent via Superhuman iOS <https://sprh.mn/?vip=afilipc...@gmail.com> On Thu, Jun 17 2021 at 7:52 AM, Alexander Filipchik <afilipc...@gmail.com> wrote: > Did some more digging. > 1) is not an option as we are not doing any cleanups at the moment. We > keep the last 4 checkpoints per job + all the savepoints. > 2) I looked at job deployments that happened 1 week before the incident. > We have 23 deployments in total and each resulted in a unique job id. I > also looked at job specific metrics and I don't see any evidence of > overlapping checkpointing. There is exactly 1 checkpoint per application, > every time it has a different job id and every time once a new job > checkpoints there are now checkpoints from previous job id. > > A bit of a mystery. Is there a way to at least catch it in the future? Any > additional telemetry (logs, metrics) we can extract to better understand > what is happening. > > Alex > > On Tue, Jun 8, 2021 at 12:01 AM Piotr Nowojski <pnowoj...@apache.org> > wrote: > >> Re-adding user mailing list >> >> Hey Alex, >> >> In that case I can see two scenarios that could lead to missing files. >> Keep in mind that incremental checkpoints are referencing previous >> checkpoints in order to minimise the size of the checkpoint (roughly >> speaking only changes since the previous checkpoint are being >> persisted/uploaded/written). Checkpoint number 42, can reference an >> arbitrary number of previous checkpoints. I suspect that somehow, some of >> those previously referenced checkpoints got deleted and removed. Also keep >> in mind that savepoints (as of now) are never incremental, they are always >> full checkpoints. However externalised checkpoints can be incremental. Back >> to the scenarios: >> 1. You might have accidentally removed some older checkpoints from your >> Job2, maybe thinking they are no longer needed. Maybe you have just kept >> this single externalised checkpoint directory from steps T3 or T4, >> disregarding that this externalised checkpoint might be referencing >> previous checkpoints of Job2? >> 2. As I mentioned, Flink is automatically maintaining reference counts of >> the used files and deletes them when they are no longer used/referenced. >> However this works only within a single job/cluster. For example if between >> steps T3 and T4, you restarted Job2 and let it run for a bit, it could take >> more checkpoints that would subsume files that were still part of the >> externalised checkpoint that you previously used to start Job3/Job4. Job2 >> would have no idea that Job3/Job4 exist, let alone that they are >> referencing some files from Job2, and those files could have been deleted >> as soon as Job2 was no longer using/referencing them. >> >> Could one of those happen in your case? >> >> Best, Piotrek >> >> pon., 7 cze 2021 o 20:01 Alexander Filipchik <afilipc...@gmail.com> >> napisał(a): >> >>> Yes, we do use incremental checkpoints. >>> >>> Alex >>> >>> On Mon, Jun 7, 2021 at 3:12 AM Piotr Nowojski <pnowoj...@apache.org> >>> wrote: >>> >>>> Hi Alex, >>>> >>>> A quick question. Are you using incremental checkpoints? >>>> >>>> Best, Piotrek >>>> >>>> sob., 5 cze 2021 o 21:23 <afilipc...@gmail.com> napisał(a): >>>> >>>>> Small correction, in T4 and T5 I mean Job2, not Job 1 (as job 1 was >>>>> save pointed). >>>>> >>>>> Thank you, >>>>> Alex >>>>> >>>>> On Jun 4, 2021, at 3:07 PM, Alexander Filipchik <afilipc...@gmail.com> >>>>> wrote: >>>>> >>>>> >>>>> Looked through the logs and didn't see anything fishy that indicated >>>>> an exception during checkpointing. >>>>> To make it clearer, here is the timeline (we use unaligned >>>>> checkpoints, and state size around 300Gb): >>>>> >>>>> T1: Job1 was running >>>>> T2: Job1 was savepointed, brought down and replaced with Job2. >>>>> T3: Attempts to savepoint Job2 failed (timed out). Job2 was cancelled, >>>>> brought down and replaced by Job3 that was restored from extarnilized >>>>> checkpoint of Job2 >>>>> T3: Attempts to savepoint Job3 failed (timed out). Job3 was cancelled, >>>>> brought down and replaced by Job4 that was restored from extarnilized >>>>> checkpoint of Job3 >>>>> T4: We realized that jobs were timing out to savepoint due to local >>>>> disk throttling. We provisioned disk with more throughput and IO. Job4 was >>>>> cancelled, Job4 was deployed and restored from externilized checkpoint of >>>>> Job3, but failed as it couldn't find some files in the folder that belongs >>>>> to the checkpoint of *Job1* >>>>> T5: We tried to redeploy and restore from checkpoints of Job3 and >>>>> Job2, but all the attempts failed on reading files from the *folder >>>>> that belongs to the checkpoint of Job1* >>>>> >>>>> We checked the content of the folder containing checkpoints of Job1, >>>>> and it has files. Not sure what is pointing tho missing files and what >>>>> could've removed them. >>>>> >>>>> Any way we can figure out what could've happened? Is there a tool that >>>>> can read the checkpoint and check whether it is valid? >>>>> >>>>> Alex >>>>> >>>>> On Thu, Jun 3, 2021 at 2:12 PM Alexander Filipchik < >>>>> afilipc...@gmail.com> wrote: >>>>> >>>>>> On the checkpoints -> what kind of issues should I check for? I was >>>>>> looking for metrics and it looks like they were reporting successful >>>>>> checkpoints. It looks like some files were removed in the shared folder, >>>>>> but I'm not sure how to check for what caused it. >>>>>> >>>>>> Savepoints were failing due to savepoint timeout timeout. Based on >>>>>> metrics, our attached disks were not fast enough (GCS regional disks are >>>>>> network disks and were throttled). The team cancelled the savepoint and >>>>>> just killed the kubernetes cluster. I assume some checkpoints were >>>>>> interrupted as the job triggers them one after another. >>>>>> >>>>>> Is there a known issue with termination during running checkpoint? >>>>>> >>>>>> Btw, we use the Flink Kube operator from Lyft. >>>>>> >>>>>> Alex >>>>>> >>>>>> On Thu, Jun 3, 2021 at 1:24 AM Chesnay Schepler <ches...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Is there anything in the Flink logs indicating issues with writing >>>>>>> the checkpoint data? >>>>>>> When the savepoint could not be created, was anything logged from >>>>>>> Flink? How did you shut down the cluster? >>>>>>> >>>>>>> On 6/3/2021 5:56 AM, Alexander Filipchik wrote: >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Trying to figure out what happened with our Flink job. We use flink >>>>>>> 1.11.1 and run a job with unaligned checkpoints and Rocks Db backend. >>>>>>> The >>>>>>> whole state is around 300Gb judging by the size of savepoints. >>>>>>> >>>>>>> The job ran ok. At some point we tried to deploy new code, but we >>>>>>> couldn't take a save point as they were timing out. It looks like the >>>>>>> reason it was timing out was due to disk throttle (we use google >>>>>>> regional >>>>>>> disks). >>>>>>> The new code was deployed using an externalized checkpoint, but it >>>>>>> didn't start as job was failing with: >>>>>>> >>>>>>> Caused by: java.io.FileNotFoundException: Item not found: >>>>>>> 'gs://../app/checkpoints/2834fa1c81dcf7c9578a8be9a371b0d1/shared/3477b236-fb4b-4a0d-be73-cb6fac62c007'. >>>>>>> Note, it is possible that the live version is still available but >>>>>>> the requested generation is deleted. >>>>>>> at com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions >>>>>>> .createFileNotFoundException(GoogleCloudStorageExceptions.java:45) >>>>>>> at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open( >>>>>>> GoogleCloudStorageImpl.java:653) >>>>>>> at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem >>>>>>> .open(GoogleCloudStorageFileSystem.java:277) >>>>>>> at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream >>>>>>> .<init>(GoogleHadoopFSInputStream.java:78) >>>>>>> at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase >>>>>>> .open(GoogleHadoopFileSystemBase.java:620) >>>>>>> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950) >>>>>>> at com.css.flink.fs.gcs.moved.HadoopFileSystem.open( >>>>>>> HadoopFileSystem.java:120) >>>>>>> at com.css.flink.fs.gcs.moved.HadoopFileSystem.open( >>>>>>> HadoopFileSystem.java:37) >>>>>>> at org.apache.flink.core.fs. >>>>>>> PluginFileSystemFactory$ClassLoaderFixingFileSystem.open( >>>>>>> PluginFileSystemFactory.java:127) >>>>>>> at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open( >>>>>>> SafetyNetWrapperFileSystem.java:85) >>>>>>> at org.apache.flink.runtime.state.filesystem.FileStateHandle >>>>>>> .openInputStream(FileStateHandle.java:69) >>>>>>> at org.apache.flink.contrib.streaming.state. >>>>>>> RocksDBStateDownloader.downloadDataForStateHandle( >>>>>>> RocksDBStateDownloader.java:126) >>>>>>> at org.apache.flink.contrib.streaming.state. >>>>>>> RocksDBStateDownloader.lambda$createDownloadRunnables$0( >>>>>>> RocksDBStateDownloader.java:109) >>>>>>> at org.apache.flink.util.function.ThrowingRunnable >>>>>>> .lambda$unchecked$0(ThrowingRunnable.java:50) >>>>>>> at java.util.concurrent.CompletableFuture$AsyncRun.run( >>>>>>> CompletableFuture.java:1640) >>>>>>> at org.apache.flink.runtime.concurrent.DirectExecutorService >>>>>>> .execute(DirectExecutorService.java:211) >>>>>>> at java.util.concurrent.CompletableFuture.asyncRunStage( >>>>>>> CompletableFuture.java:1654) >>>>>>> at java.util.concurrent.CompletableFuture.runAsync( >>>>>>> CompletableFuture.java:1871) >>>>>>> at org.apache.flink.contrib.streaming.state. >>>>>>> RocksDBStateDownloader.downloadDataForAllStateHandles( >>>>>>> RocksDBStateDownloader.java:83) >>>>>>> at org.apache.flink.contrib.streaming.state. >>>>>>> RocksDBStateDownloader.transferAllStateDataToDirectory( >>>>>>> RocksDBStateDownloader.java:66) >>>>>>> at org.apache.flink.contrib.streaming.state.restore. >>>>>>> RocksDBIncrementalRestoreOperation >>>>>>> .transferRemoteStateToLocalDirectory( >>>>>>> RocksDBIncrementalRestoreOperation.java:230) >>>>>>> at org.apache.flink.contrib.streaming.state.restore. >>>>>>> RocksDBIncrementalRestoreOperation.restoreFromRemoteState( >>>>>>> RocksDBIncrementalRestoreOperation.java:195) >>>>>>> at org.apache.flink.contrib.streaming.state.restore. >>>>>>> RocksDBIncrementalRestoreOperation.restoreWithoutRescaling( >>>>>>> RocksDBIncrementalRestoreOperation.java:169) >>>>>>> at org.apache.flink.contrib.streaming.state.restore. >>>>>>> RocksDBIncrementalRestoreOperation.restore( >>>>>>> RocksDBIncrementalRestoreOperation.java:155) >>>>>>> at org.apache.flink.contrib.streaming.state. >>>>>>> RocksDBKeyedStateBackendBuilder.build( >>>>>>> RocksDBKeyedStateBackendBuilder.java:270) >>>>>>> ... 15 more >>>>>>> We tried to roll back the code, we tried different checkpoints, but >>>>>>> all the attempts failed with the same error. The job ID in the error is >>>>>>> not >>>>>>> from the same checkpoint path, it looks like restore logic >>>>>>> looks back at previous jobs, as all the checkpoints after >>>>>>> 2834fa1c81dcf7c9578a8be9a371b0d1 are failing to restore with the same >>>>>>> error. >>>>>>> We looked at different checkpoints and found that some of them miss >>>>>>> metadata file and can't be used for restoration. >>>>>>> We also use ZK for HA, and we cleaned up the state there between >>>>>>> deployments to make sure the non existent file >>>>>>> is not coming from there. >>>>>>> We decided to drop the state as we have means to repopulate it, but >>>>>>> it would be great to get to the bottom of it. Any help will be >>>>>>> appreciated. >>>>>>> >>>>>>> Alex >>>>>>> >>>>>>> >>>>>>>