Hi,

Trying to figure out what happened with our Flink job. We use flink 1.11.1
and run a job with unaligned checkpoints and Rocks Db backend. The whole
state is around 300Gb judging by the size of savepoints.

The job ran ok. At some point we tried to deploy new code, but we couldn't
take a save point as they were timing out. It looks like the reason it was
timing out was due to disk throttle (we use google regional disks).
The new code was deployed using an externalized checkpoint, but it didn't
start as job was failing with:

Caused by: java.io.FileNotFoundException: Item not found:
'gs://../app/checkpoints/2834fa1c81dcf7c9578a8be9a371b0d1/shared/3477b236-fb4b-4a0d-be73-cb6fac62c007'.
Note, it is possible that the live version is still available but the
requested generation is deleted.
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions
.createFileNotFoundException(GoogleCloudStorageExceptions.java:45)
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(
GoogleCloudStorageImpl.java:653)
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(
GoogleCloudStorageFileSystem.java:277)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(
GoogleHadoopFSInputStream.java:78)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(
GoogleHadoopFileSystemBase.java:620)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
    at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem
.java:120)
    at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem
.java:37)
    at org.apache.flink.core.fs.
PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(
PluginFileSystemFactory.java:127)
    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(
SafetyNetWrapperFileSystem.java:85)
    at org.apache.flink.runtime.state.filesystem.FileStateHandle
.openInputStream(FileStateHandle.java:69)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader
.downloadDataForStateHandle(RocksDBStateDownloader.java:126)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader
.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
    at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(
ThrowingRunnable.java:50)
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture
.java:1640)
    at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(
DirectExecutorService.java:211)
    at java.util.concurrent.CompletableFuture.asyncRunStage(
CompletableFuture.java:1654)
    at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture
.java:1871)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader
.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader
.transferAllStateDataToDirectory(RocksDBStateDownloader.java:66)
    at org.apache.flink.contrib.streaming.state.restore.
RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(
RocksDBIncrementalRestoreOperation.java:230)
    at org.apache.flink.contrib.streaming.state.restore.
RocksDBIncrementalRestoreOperation.restoreFromRemoteState(
RocksDBIncrementalRestoreOperation.java:195)
    at org.apache.flink.contrib.streaming.state.restore.
RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(
RocksDBIncrementalRestoreOperation.java:169)
    at org.apache.flink.contrib.streaming.state.restore.
RocksDBIncrementalRestoreOperation.restore(
RocksDBIncrementalRestoreOperation.java:155)
    at org.apache.flink.contrib.streaming.state.
RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:
270)
    ... 15 more

We tried to roll back the code, we tried different checkpoints, but all the
attempts failed with the same error. The job ID in the error is not from
the same checkpoint path, it looks like restore logic
looks back at previous jobs, as all the checkpoints after
2834fa1c81dcf7c9578a8be9a371b0d1 are failing to restore with the same error.
We looked at different checkpoints and found that some of them miss
metadata file and can't be used for restoration.
We also use ZK for HA, and we cleaned up the state there between
deployments to make sure the non existent file
is not coming from there.

We decided to drop the state as we have means to repopulate it, but it
would be great to get to the bottom of it. Any help will be appreciated.

Alex

Reply via email to