Hey Alex,

Sorry, I've missed your previous email. I've spent a bit more time
searching our Jira for relevant bugs and maybe you were hit by this one:
https://issues.apache.org/jira/browse/FLINK-21351
?
> T2: Job1 was savepointed, brought down and replaced with Job2.

This in combination with FLINK-21351 could cause Flink to incorrectly
remove still referenced incremental checkpoints. That's my best explanation
as to what has caused this. Could you try upgrading to the latest 1.12.x or
1.13.x release?

Best,
Piotrek

sob., 3 lip 2021 o 23:39 Alexander Filipchik <afilipc...@gmail.com>
napisał(a):

> Bumping it up, any known way to catch it if it happens again ? Any logs we
> should enable?
>
> Sent via Superhuman iOS <https://sprh.mn/?vip=afilipc...@gmail.com>
>
>
> On Thu, Jun 17 2021 at 7:52 AM, Alexander Filipchik <afilipc...@gmail.com>
> wrote:
>
>> Did some more digging.
>> 1) is not an option as we are not doing any cleanups at the moment. We
>> keep the last 4 checkpoints per job + all the savepoints.
>> 2) I looked at job deployments that happened 1 week before the incident.
>> We have 23 deployments in total and each resulted in a unique job id. I
>> also looked at job specific metrics and I don't see any evidence of
>> overlapping checkpointing. There is exactly 1 checkpoint per application,
>> every time it has a different job id and every time once a new job
>> checkpoints there are now checkpoints from previous job id.
>>
>> A bit of a mystery. Is there a way to at least catch it in the future?
>> Any additional telemetry (logs, metrics) we can extract to better
>> understand what is happening.
>>
>> Alex
>>
>> On Tue, Jun 8, 2021 at 12:01 AM Piotr Nowojski <pnowoj...@apache.org>
>> wrote:
>>
>>> Re-adding user mailing list
>>>
>>> Hey Alex,
>>>
>>> In that case I can see two scenarios that could lead to missing files.
>>> Keep in mind that incremental checkpoints are referencing previous
>>> checkpoints in order to minimise the size of the checkpoint (roughly
>>> speaking only changes since the previous checkpoint are being
>>> persisted/uploaded/written). Checkpoint number 42, can reference an
>>> arbitrary number of previous checkpoints. I suspect that somehow, some of
>>> those previously referenced checkpoints got deleted and removed. Also keep
>>> in mind that savepoints (as of now) are never incremental, they are always
>>> full checkpoints. However externalised checkpoints can be incremental. Back
>>> to the scenarios:
>>> 1. You might have accidentally removed some older checkpoints from your
>>> Job2, maybe thinking they are no longer needed. Maybe you have just kept
>>> this single externalised checkpoint directory from steps T3 or T4,
>>> disregarding that this externalised checkpoint might be referencing
>>> previous checkpoints of Job2?
>>> 2. As I mentioned, Flink is automatically maintaining reference counts
>>> of the used files and deletes them when they are no longer used/referenced.
>>> However this works only within a single job/cluster. For example if between
>>> steps T3 and T4, you restarted Job2 and let it run for a bit, it could take
>>> more checkpoints that would subsume files that were still part of the
>>> externalised checkpoint that you previously used to start Job3/Job4. Job2
>>> would have no idea that Job3/Job4 exist, let alone that they are
>>> referencing some files from Job2, and those files could have been deleted
>>> as soon as Job2 was no longer using/referencing them.
>>>
>>> Could one of those happen in your case?
>>>
>>> Best, Piotrek
>>>
>>> pon., 7 cze 2021 o 20:01 Alexander Filipchik <afilipc...@gmail.com>
>>> napisał(a):
>>>
>>>> Yes, we do use incremental checkpoints.
>>>>
>>>> Alex
>>>>
>>>> On Mon, Jun 7, 2021 at 3:12 AM Piotr Nowojski <pnowoj...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Alex,
>>>>>
>>>>> A quick question. Are you using incremental checkpoints?
>>>>>
>>>>> Best, Piotrek
>>>>>
>>>>> sob., 5 cze 2021 o 21:23 <afilipc...@gmail.com> napisał(a):
>>>>>
>>>>>> Small correction, in T4 and T5 I mean Job2, not Job 1 (as job 1 was
>>>>>> save pointed).
>>>>>>
>>>>>> Thank you,
>>>>>> Alex
>>>>>>
>>>>>> On Jun 4, 2021, at 3:07 PM, Alexander Filipchik <afilipc...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> 
>>>>>> Looked through the logs and didn't see anything fishy that indicated
>>>>>> an exception during checkpointing.
>>>>>> To make it clearer, here is the timeline (we use unaligned
>>>>>> checkpoints, and state size around 300Gb):
>>>>>>
>>>>>> T1: Job1 was running
>>>>>> T2: Job1 was savepointed, brought down and replaced with Job2.
>>>>>> T3: Attempts to savepoint Job2 failed (timed out). Job2 was
>>>>>> cancelled, brought down and replaced by Job3 that was restored from
>>>>>> extarnilized checkpoint of Job2
>>>>>> T3: Attempts to savepoint Job3 failed (timed out). Job3 was
>>>>>> cancelled, brought down and replaced by Job4 that was restored from
>>>>>> extarnilized checkpoint of Job3
>>>>>> T4: We realized that jobs were timing out to savepoint due to local
>>>>>> disk throttling. We provisioned disk with more throughput and IO. Job4 
>>>>>> was
>>>>>> cancelled, Job4 was deployed and restored from externilized checkpoint of
>>>>>> Job3, but failed as it couldn't find some files in the folder that 
>>>>>> belongs
>>>>>> to the checkpoint of *Job1*
>>>>>> T5: We tried to redeploy and restore from checkpoints of Job3 and
>>>>>> Job2, but all the attempts failed on reading files from the *folder
>>>>>> that belongs to the checkpoint of Job1*
>>>>>>
>>>>>> We checked the content of the folder containing checkpoints of Job1,
>>>>>> and it has files. Not sure what is pointing tho missing files and what
>>>>>> could've removed them.
>>>>>>
>>>>>> Any way we can figure out what could've happened? Is there a tool
>>>>>> that can read the checkpoint and check whether it is valid?
>>>>>>
>>>>>> Alex
>>>>>>
>>>>>> On Thu, Jun 3, 2021 at 2:12 PM Alexander Filipchik <
>>>>>> afilipc...@gmail.com> wrote:
>>>>>>
>>>>>>> On the checkpoints -> what kind of issues should I check for? I was
>>>>>>> looking for metrics and it looks like they were reporting successful
>>>>>>> checkpoints. It looks like some files were removed in the shared folder,
>>>>>>> but I'm not sure how to check for what caused it.
>>>>>>>
>>>>>>> Savepoints were failing due to savepoint timeout timeout. Based on
>>>>>>> metrics, our attached disks were not fast enough (GCS regional disks are
>>>>>>> network disks and were throttled). The team cancelled the savepoint and
>>>>>>> just killed the kubernetes cluster. I assume some checkpoints were
>>>>>>> interrupted as the job triggers them one after another.
>>>>>>>
>>>>>>> Is there a known issue with termination during running checkpoint?
>>>>>>>
>>>>>>> Btw, we use the Flink Kube operator from Lyft.
>>>>>>>
>>>>>>> Alex
>>>>>>>
>>>>>>> On Thu, Jun 3, 2021 at 1:24 AM Chesnay Schepler <ches...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Is there anything in the Flink logs indicating issues with writing
>>>>>>>> the checkpoint data?
>>>>>>>> When the savepoint could not be created, was anything logged from
>>>>>>>> Flink? How did you shut down the cluster?
>>>>>>>>
>>>>>>>> On 6/3/2021 5:56 AM, Alexander Filipchik wrote:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Trying to figure out what happened with our Flink job. We use flink
>>>>>>>> 1.11.1 and run a job with unaligned checkpoints and Rocks Db backend. 
>>>>>>>> The
>>>>>>>> whole state is around 300Gb judging by the size of savepoints.
>>>>>>>>
>>>>>>>> The job ran ok. At some point we tried to deploy new code, but we
>>>>>>>> couldn't take a save point as they were timing out. It looks like the
>>>>>>>> reason it was timing out was due to disk throttle (we use google 
>>>>>>>> regional
>>>>>>>> disks).
>>>>>>>> The new code was deployed using an externalized checkpoint, but it
>>>>>>>> didn't start as job was failing with:
>>>>>>>>
>>>>>>>> Caused by: java.io.FileNotFoundException: Item not found:
>>>>>>>> 'gs://../app/checkpoints/2834fa1c81dcf7c9578a8be9a371b0d1/shared/3477b236-fb4b-4a0d-be73-cb6fac62c007'.
>>>>>>>> Note, it is possible that the live version is still available but
>>>>>>>> the requested generation is deleted.
>>>>>>>>     at com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions
>>>>>>>> .createFileNotFoundException(GoogleCloudStorageExceptions.java:45)
>>>>>>>>     at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(
>>>>>>>> GoogleCloudStorageImpl.java:653)
>>>>>>>>     at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem
>>>>>>>> .open(GoogleCloudStorageFileSystem.java:277)
>>>>>>>>     at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream
>>>>>>>> .<init>(GoogleHadoopFSInputStream.java:78)
>>>>>>>>     at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase
>>>>>>>> .open(GoogleHadoopFileSystemBase.java:620)
>>>>>>>>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
>>>>>>>>     at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(
>>>>>>>> HadoopFileSystem.java:120)
>>>>>>>>     at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(
>>>>>>>> HadoopFileSystem.java:37)
>>>>>>>>     at org.apache.flink.core.fs.
>>>>>>>> PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(
>>>>>>>> PluginFileSystemFactory.java:127)
>>>>>>>>     at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(
>>>>>>>> SafetyNetWrapperFileSystem.java:85)
>>>>>>>>     at org.apache.flink.runtime.state.filesystem.FileStateHandle
>>>>>>>> .openInputStream(FileStateHandle.java:69)
>>>>>>>>     at org.apache.flink.contrib.streaming.state.
>>>>>>>> RocksDBStateDownloader.downloadDataForStateHandle(
>>>>>>>> RocksDBStateDownloader.java:126)
>>>>>>>>     at org.apache.flink.contrib.streaming.state.
>>>>>>>> RocksDBStateDownloader.lambda$createDownloadRunnables$0(
>>>>>>>> RocksDBStateDownloader.java:109)
>>>>>>>>     at org.apache.flink.util.function.ThrowingRunnable
>>>>>>>> .lambda$unchecked$0(ThrowingRunnable.java:50)
>>>>>>>>     at java.util.concurrent.CompletableFuture$AsyncRun.run(
>>>>>>>> CompletableFuture.java:1640)
>>>>>>>>     at org.apache.flink.runtime.concurrent.DirectExecutorService
>>>>>>>> .execute(DirectExecutorService.java:211)
>>>>>>>>     at java.util.concurrent.CompletableFuture.asyncRunStage(
>>>>>>>> CompletableFuture.java:1654)
>>>>>>>>     at java.util.concurrent.CompletableFuture.runAsync(
>>>>>>>> CompletableFuture.java:1871)
>>>>>>>>     at org.apache.flink.contrib.streaming.state.
>>>>>>>> RocksDBStateDownloader.downloadDataForAllStateHandles(
>>>>>>>> RocksDBStateDownloader.java:83)
>>>>>>>>     at org.apache.flink.contrib.streaming.state.
>>>>>>>> RocksDBStateDownloader.transferAllStateDataToDirectory(
>>>>>>>> RocksDBStateDownloader.java:66)
>>>>>>>>     at org.apache.flink.contrib.streaming.state.restore.
>>>>>>>> RocksDBIncrementalRestoreOperation
>>>>>>>> .transferRemoteStateToLocalDirectory(
>>>>>>>> RocksDBIncrementalRestoreOperation.java:230)
>>>>>>>>     at org.apache.flink.contrib.streaming.state.restore.
>>>>>>>> RocksDBIncrementalRestoreOperation.restoreFromRemoteState(
>>>>>>>> RocksDBIncrementalRestoreOperation.java:195)
>>>>>>>>     at org.apache.flink.contrib.streaming.state.restore.
>>>>>>>> RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(
>>>>>>>> RocksDBIncrementalRestoreOperation.java:169)
>>>>>>>>     at org.apache.flink.contrib.streaming.state.restore.
>>>>>>>> RocksDBIncrementalRestoreOperation.restore(
>>>>>>>> RocksDBIncrementalRestoreOperation.java:155)
>>>>>>>>     at org.apache.flink.contrib.streaming.state.
>>>>>>>> RocksDBKeyedStateBackendBuilder.build(
>>>>>>>> RocksDBKeyedStateBackendBuilder.java:270)
>>>>>>>>     ... 15 more
>>>>>>>> We tried to roll back the code, we tried different checkpoints, but
>>>>>>>> all the attempts failed with the same error. The job ID in the error 
>>>>>>>> is not
>>>>>>>> from the same checkpoint path, it looks like restore logic
>>>>>>>> looks back at previous jobs, as all the checkpoints after
>>>>>>>> 2834fa1c81dcf7c9578a8be9a371b0d1 are failing to restore with the same 
>>>>>>>> error.
>>>>>>>> We looked at different checkpoints and found that some of them miss
>>>>>>>> metadata file and can't be used for restoration.
>>>>>>>> We also use ZK for HA, and we cleaned up the state there between
>>>>>>>> deployments to make sure the non existent file
>>>>>>>> is not coming from there.
>>>>>>>> We decided to drop the state as we have means to repopulate it, but
>>>>>>>> it would be great to get to the bottom of it. Any help will be 
>>>>>>>> appreciated.
>>>>>>>>
>>>>>>>> Alex
>>>>>>>>
>>>>>>>>
>>>>>>>>

Reply via email to