Bumping it up, any known way to catch it if it happens again ? Any logs we
should enable?

Sent via Superhuman iOS <https://sprh.mn/?vip=afilipc...@gmail.com>


On Thu, Jun 17 2021 at 7:52 AM, Alexander Filipchik <afilipc...@gmail.com>
wrote:

> Did some more digging.
> 1) is not an option as we are not doing any cleanups at the moment. We
> keep the last 4 checkpoints per job + all the savepoints.
> 2) I looked at job deployments that happened 1 week before the incident.
> We have 23 deployments in total and each resulted in a unique job id. I
> also looked at job specific metrics and I don't see any evidence of
> overlapping checkpointing. There is exactly 1 checkpoint per application,
> every time it has a different job id and every time once a new job
> checkpoints there are now checkpoints from previous job id.
>
> A bit of a mystery. Is there a way to at least catch it in the future? Any
> additional telemetry (logs, metrics) we can extract to better understand
> what is happening.
>
> Alex
>
> On Tue, Jun 8, 2021 at 12:01 AM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
>> Re-adding user mailing list
>>
>> Hey Alex,
>>
>> In that case I can see two scenarios that could lead to missing files.
>> Keep in mind that incremental checkpoints are referencing previous
>> checkpoints in order to minimise the size of the checkpoint (roughly
>> speaking only changes since the previous checkpoint are being
>> persisted/uploaded/written). Checkpoint number 42, can reference an
>> arbitrary number of previous checkpoints. I suspect that somehow, some of
>> those previously referenced checkpoints got deleted and removed. Also keep
>> in mind that savepoints (as of now) are never incremental, they are always
>> full checkpoints. However externalised checkpoints can be incremental. Back
>> to the scenarios:
>> 1. You might have accidentally removed some older checkpoints from your
>> Job2, maybe thinking they are no longer needed. Maybe you have just kept
>> this single externalised checkpoint directory from steps T3 or T4,
>> disregarding that this externalised checkpoint might be referencing
>> previous checkpoints of Job2?
>> 2. As I mentioned, Flink is automatically maintaining reference counts of
>> the used files and deletes them when they are no longer used/referenced.
>> However this works only within a single job/cluster. For example if between
>> steps T3 and T4, you restarted Job2 and let it run for a bit, it could take
>> more checkpoints that would subsume files that were still part of the
>> externalised checkpoint that you previously used to start Job3/Job4. Job2
>> would have no idea that Job3/Job4 exist, let alone that they are
>> referencing some files from Job2, and those files could have been deleted
>> as soon as Job2 was no longer using/referencing them.
>>
>> Could one of those happen in your case?
>>
>> Best, Piotrek
>>
>> pon., 7 cze 2021 o 20:01 Alexander Filipchik <afilipc...@gmail.com>
>> napisał(a):
>>
>>> Yes, we do use incremental checkpoints.
>>>
>>> Alex
>>>
>>> On Mon, Jun 7, 2021 at 3:12 AM Piotr Nowojski <pnowoj...@apache.org>
>>> wrote:
>>>
>>>> Hi Alex,
>>>>
>>>> A quick question. Are you using incremental checkpoints?
>>>>
>>>> Best, Piotrek
>>>>
>>>> sob., 5 cze 2021 o 21:23 <afilipc...@gmail.com> napisał(a):
>>>>
>>>>> Small correction, in T4 and T5 I mean Job2, not Job 1 (as job 1 was
>>>>> save pointed).
>>>>>
>>>>> Thank you,
>>>>> Alex
>>>>>
>>>>> On Jun 4, 2021, at 3:07 PM, Alexander Filipchik <afilipc...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> 
>>>>> Looked through the logs and didn't see anything fishy that indicated
>>>>> an exception during checkpointing.
>>>>> To make it clearer, here is the timeline (we use unaligned
>>>>> checkpoints, and state size around 300Gb):
>>>>>
>>>>> T1: Job1 was running
>>>>> T2: Job1 was savepointed, brought down and replaced with Job2.
>>>>> T3: Attempts to savepoint Job2 failed (timed out). Job2 was cancelled,
>>>>> brought down and replaced by Job3 that was restored from extarnilized
>>>>> checkpoint of Job2
>>>>> T3: Attempts to savepoint Job3 failed (timed out). Job3 was cancelled,
>>>>> brought down and replaced by Job4 that was restored from extarnilized
>>>>> checkpoint of Job3
>>>>> T4: We realized that jobs were timing out to savepoint due to local
>>>>> disk throttling. We provisioned disk with more throughput and IO. Job4 was
>>>>> cancelled, Job4 was deployed and restored from externilized checkpoint of
>>>>> Job3, but failed as it couldn't find some files in the folder that belongs
>>>>> to the checkpoint of *Job1*
>>>>> T5: We tried to redeploy and restore from checkpoints of Job3 and
>>>>> Job2, but all the attempts failed on reading files from the *folder
>>>>> that belongs to the checkpoint of Job1*
>>>>>
>>>>> We checked the content of the folder containing checkpoints of Job1,
>>>>> and it has files. Not sure what is pointing tho missing files and what
>>>>> could've removed them.
>>>>>
>>>>> Any way we can figure out what could've happened? Is there a tool that
>>>>> can read the checkpoint and check whether it is valid?
>>>>>
>>>>> Alex
>>>>>
>>>>> On Thu, Jun 3, 2021 at 2:12 PM Alexander Filipchik <
>>>>> afilipc...@gmail.com> wrote:
>>>>>
>>>>>> On the checkpoints -> what kind of issues should I check for? I was
>>>>>> looking for metrics and it looks like they were reporting successful
>>>>>> checkpoints. It looks like some files were removed in the shared folder,
>>>>>> but I'm not sure how to check for what caused it.
>>>>>>
>>>>>> Savepoints were failing due to savepoint timeout timeout. Based on
>>>>>> metrics, our attached disks were not fast enough (GCS regional disks are
>>>>>> network disks and were throttled). The team cancelled the savepoint and
>>>>>> just killed the kubernetes cluster. I assume some checkpoints were
>>>>>> interrupted as the job triggers them one after another.
>>>>>>
>>>>>> Is there a known issue with termination during running checkpoint?
>>>>>>
>>>>>> Btw, we use the Flink Kube operator from Lyft.
>>>>>>
>>>>>> Alex
>>>>>>
>>>>>> On Thu, Jun 3, 2021 at 1:24 AM Chesnay Schepler <ches...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Is there anything in the Flink logs indicating issues with writing
>>>>>>> the checkpoint data?
>>>>>>> When the savepoint could not be created, was anything logged from
>>>>>>> Flink? How did you shut down the cluster?
>>>>>>>
>>>>>>> On 6/3/2021 5:56 AM, Alexander Filipchik wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Trying to figure out what happened with our Flink job. We use flink
>>>>>>> 1.11.1 and run a job with unaligned checkpoints and Rocks Db backend. 
>>>>>>> The
>>>>>>> whole state is around 300Gb judging by the size of savepoints.
>>>>>>>
>>>>>>> The job ran ok. At some point we tried to deploy new code, but we
>>>>>>> couldn't take a save point as they were timing out. It looks like the
>>>>>>> reason it was timing out was due to disk throttle (we use google 
>>>>>>> regional
>>>>>>> disks).
>>>>>>> The new code was deployed using an externalized checkpoint, but it
>>>>>>> didn't start as job was failing with:
>>>>>>>
>>>>>>> Caused by: java.io.FileNotFoundException: Item not found:
>>>>>>> 'gs://../app/checkpoints/2834fa1c81dcf7c9578a8be9a371b0d1/shared/3477b236-fb4b-4a0d-be73-cb6fac62c007'.
>>>>>>> Note, it is possible that the live version is still available but
>>>>>>> the requested generation is deleted.
>>>>>>>     at com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions
>>>>>>> .createFileNotFoundException(GoogleCloudStorageExceptions.java:45)
>>>>>>>     at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(
>>>>>>> GoogleCloudStorageImpl.java:653)
>>>>>>>     at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem
>>>>>>> .open(GoogleCloudStorageFileSystem.java:277)
>>>>>>>     at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream
>>>>>>> .<init>(GoogleHadoopFSInputStream.java:78)
>>>>>>>     at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase
>>>>>>> .open(GoogleHadoopFileSystemBase.java:620)
>>>>>>>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
>>>>>>>     at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(
>>>>>>> HadoopFileSystem.java:120)
>>>>>>>     at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(
>>>>>>> HadoopFileSystem.java:37)
>>>>>>>     at org.apache.flink.core.fs.
>>>>>>> PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(
>>>>>>> PluginFileSystemFactory.java:127)
>>>>>>>     at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(
>>>>>>> SafetyNetWrapperFileSystem.java:85)
>>>>>>>     at org.apache.flink.runtime.state.filesystem.FileStateHandle
>>>>>>> .openInputStream(FileStateHandle.java:69)
>>>>>>>     at org.apache.flink.contrib.streaming.state.
>>>>>>> RocksDBStateDownloader.downloadDataForStateHandle(
>>>>>>> RocksDBStateDownloader.java:126)
>>>>>>>     at org.apache.flink.contrib.streaming.state.
>>>>>>> RocksDBStateDownloader.lambda$createDownloadRunnables$0(
>>>>>>> RocksDBStateDownloader.java:109)
>>>>>>>     at org.apache.flink.util.function.ThrowingRunnable
>>>>>>> .lambda$unchecked$0(ThrowingRunnable.java:50)
>>>>>>>     at java.util.concurrent.CompletableFuture$AsyncRun.run(
>>>>>>> CompletableFuture.java:1640)
>>>>>>>     at org.apache.flink.runtime.concurrent.DirectExecutorService
>>>>>>> .execute(DirectExecutorService.java:211)
>>>>>>>     at java.util.concurrent.CompletableFuture.asyncRunStage(
>>>>>>> CompletableFuture.java:1654)
>>>>>>>     at java.util.concurrent.CompletableFuture.runAsync(
>>>>>>> CompletableFuture.java:1871)
>>>>>>>     at org.apache.flink.contrib.streaming.state.
>>>>>>> RocksDBStateDownloader.downloadDataForAllStateHandles(
>>>>>>> RocksDBStateDownloader.java:83)
>>>>>>>     at org.apache.flink.contrib.streaming.state.
>>>>>>> RocksDBStateDownloader.transferAllStateDataToDirectory(
>>>>>>> RocksDBStateDownloader.java:66)
>>>>>>>     at org.apache.flink.contrib.streaming.state.restore.
>>>>>>> RocksDBIncrementalRestoreOperation
>>>>>>> .transferRemoteStateToLocalDirectory(
>>>>>>> RocksDBIncrementalRestoreOperation.java:230)
>>>>>>>     at org.apache.flink.contrib.streaming.state.restore.
>>>>>>> RocksDBIncrementalRestoreOperation.restoreFromRemoteState(
>>>>>>> RocksDBIncrementalRestoreOperation.java:195)
>>>>>>>     at org.apache.flink.contrib.streaming.state.restore.
>>>>>>> RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(
>>>>>>> RocksDBIncrementalRestoreOperation.java:169)
>>>>>>>     at org.apache.flink.contrib.streaming.state.restore.
>>>>>>> RocksDBIncrementalRestoreOperation.restore(
>>>>>>> RocksDBIncrementalRestoreOperation.java:155)
>>>>>>>     at org.apache.flink.contrib.streaming.state.
>>>>>>> RocksDBKeyedStateBackendBuilder.build(
>>>>>>> RocksDBKeyedStateBackendBuilder.java:270)
>>>>>>>     ... 15 more
>>>>>>> We tried to roll back the code, we tried different checkpoints, but
>>>>>>> all the attempts failed with the same error. The job ID in the error is 
>>>>>>> not
>>>>>>> from the same checkpoint path, it looks like restore logic
>>>>>>> looks back at previous jobs, as all the checkpoints after
>>>>>>> 2834fa1c81dcf7c9578a8be9a371b0d1 are failing to restore with the same 
>>>>>>> error.
>>>>>>> We looked at different checkpoints and found that some of them miss
>>>>>>> metadata file and can't be used for restoration.
>>>>>>> We also use ZK for HA, and we cleaned up the state there between
>>>>>>> deployments to make sure the non existent file
>>>>>>> is not coming from there.
>>>>>>> We decided to drop the state as we have means to repopulate it, but
>>>>>>> it would be great to get to the bottom of it. Any help will be 
>>>>>>> appreciated.
>>>>>>>
>>>>>>> Alex
>>>>>>>
>>>>>>>
>>>>>>>

Reply via email to