Hi Till & others, We enabled setFailOnCheckpointingErrors (setTolerableCheckpointFailureNumber isn't available in 1.8) and this indeed prevents the large number of restarts.
Hopefully a solution for the reported issue[1] with google gets found but for now this solved our immediate problem. Thanks again! [1] https://issuetracker.google.com/issues/137168102 Regards, Richard On Thu, Jan 30, 2020 at 11:40 AM Arvid Heise <ar...@ververica.com> wrote: > If a checkpoint is not successful, it cannot be used for recovery. > That means Flink will restart to the last successful checkpoint and hence > not lose any data. > > On Wed, Jan 29, 2020 at 9:52 PM wvl <lee...@gmail.com> wrote: > >> Forgive my lack of knowledge here - I'm a bit out of my league here. >> >> But I was wondering if allowing e.g. 1 checkpoint to fail and the reason >> for which somehow caused a record to be lost (e.g. rocksdb exception / >> taskmanager crash / etc), there would be no Source rewind to the last >> successful checkpoint and this record would be lost forever, correct? >> >> On Wed, 29 Jan 2020, 17:51 Richard Deurwaarder, <rich...@xeli.eu> wrote: >> >>> Hi Till, >>> >>> I'll see if we can ask google to comment on those issues, perhaps they >>> have a fix in the works that would solve the root problem. >>> In the meanwhile >>> `CheckpointConfig.setTolerableCheckpointFailureNumber` sounds very >>> promising! >>> Thank you for this. I'm going to try this tomorrow to see if that helps. >>> I will let you know! >>> >>> Richard >>> >>> On Wed, Jan 29, 2020 at 3:47 PM Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Hi Richard, >>>> >>>> googling a bit indicates that this might actually be a GCS problem [1, >>>> 2, 3]. The proposed solution/workaround so far is to retry the whole upload >>>> operation as part of the application logic. Since I assume that you are >>>> writing to GCS via Hadoop's file system this should actually fall into the >>>> realm of the Hadoop file system implementation and not Flink. >>>> >>>> What you could do to mitigate the problem a bit is to set the number of >>>> tolerable checkpoint failures to a non-zero value via >>>> `CheckpointConfig.setTolerableCheckpointFailureNumber`. Setting this to `n` >>>> means that the job will only fail and then restart after `n` checkpoint >>>> failures. Unfortunately, we do not support a failure rate yet. >>>> >>>> [1] https://github.com/googleapis/google-cloud-java/issues/3586 >>>> [2] https://github.com/googleapis/google-cloud-java/issues/5704 >>>> [3] https://issuetracker.google.com/issues/137168102 >>>> >>>> Cheers, >>>> Till >>>> >>>> On Tue, Jan 28, 2020 at 6:25 PM Richard Deurwaarder <rich...@xeli.eu> >>>> wrote: >>>> >>>>> Hi all, >>>>> >>>>> We've got a Flink job running on 1.8.0 which writes its state >>>>> (rocksdb) to Google Cloud Storage[1]. We've noticed that jobs with a large >>>>> amount of state (500gb range) are becoming *very* unstable. In the order >>>>> of >>>>> restarting once an hour or even more. >>>>> >>>>> The reason for this instability is that we run into "410 Gone"[4] >>>>> errors from Google Cloud Storage. This indicates an upload (write from >>>>> Flink's perspective) took place and it wanted to resume the write[2] but >>>>> could not find the file which it needed to resume. My guess is this is >>>>> because the previous attempt either failed or perhaps it uploads in chunks >>>>> of 67mb [3]. >>>>> >>>>> The library logs this line when this happens: >>>>> >>>>> "Encountered status code 410 when accessing URL >>>>> https://www.googleapis.com/upload/storage/v1/b/<project>/o?ifGenerationMatch=0&name=job-manager/15aa2391-a055-4bfd-8d82-e9e4806baa9c/8ae818761055cdc022822010a8b4a1ed/chk-52224/_metadata&uploadType=resumable&upload_id=AEnB2UqJwkdrQ8YuzqrTp9Nk4bDnzbuJcTlD5E5hKNLNz4xQ7vjlYrDzYC29ImHcp0o6OjSCmQo6xkDSj5OHly7aChH0JxxXcg. >>>>> Delegating to response handler for possible retry." >>>>> >>>>> We're kind of stuck on these questions: >>>>> * Is flink capable or doing these retries? >>>>> * Does anyone succesfully write their (rocksdb) state to Google Cloud >>>>> storage for bigger state sizes? >>>>> * Is it possible flink renames or deletes certain directories before >>>>> all flushes have been done based on an atomic guarantee provided by HDFS >>>>> that does not hold on other implementations perhaps? A race condition of >>>>> sorts >>>>> >>>>> Basically does anyone recognize this behavior? >>>>> >>>>> Regards, >>>>> >>>>> Richard Deurwaarder >>>>> >>>>> [1] We use an HDFS implementation provided by Google >>>>> https://github.com/GoogleCloudDataproc/bigdata-interop/tree/master/gcs >>>>> [2] >>>>> https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone >>>>> [3] >>>>> https://github.com/GoogleCloudDataproc/bigdata-interop/blob/master/gcs/CONFIGURATION.md >>>>> (see >>>>> fs.gs.outputstream.upload.chunk.size) >>>>> [4] Stacktrace: >>>>> https://gist.github.com/Xeli/da4c0af2c49c060139ad01945488e492 >>>>> >>>>