Hi Rakshit, It sounds to me as if you don't need the Savepoint API at all. You can (re)start all applications with the previous state (be it retained checkpoint or savepoint). You just need to provide the path to that in your application invocation [1] (every entry point has such a parameter, you might need to check the respective documentation if you are not using CLI). Note that although it only says savepoint, starting from a checkpoint is fine as well (just not recommended in the beginning).
[1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#starting-a-job-from-a-savepoint On Thu, Jul 8, 2021 at 6:31 AM Rakshit Ramesh <rakshit.ram...@datakaveri.org> wrote: > Sorry for being a little vague there. > I want to create a Savepoint from a DataStream right before the job is > finished or cancelled. > What you have shown in the IT case is how a datastream can be bootstrapped > with state that is > formed formed by means of DataSet. > My jobs are triggered by a scheduler periodically (every day) using the > api and I would like > to bootstrap each day's job with the state of the previous day. > > But thanks for the input on the Checkpoint behaviour wrt a FINISHED state, > I think that will work for me. > > Thanks! > > On Thu, 8 Jul 2021 at 02:03, Arvid Heise <ar...@apache.org> wrote: > >> I don't quite understand your question. You use Savepoint API to create a >> savepoint with a batch job (that's why it's DataSet Transform currently). >> That savepoint can only be restored through a datastream application. >> Dataset applications cannot start from a savepoint. >> >> So I don't understand why you see a difference between "restoring a >> savepoint to a datastream" and "create a NewSavepoint for a datastream". >> It's ultimately the same thing for me. Just to be very clear: the main >> purpose of Savepoint API is to create the initial state of a datastream >> application. >> >> For your second question, yes retained checkpoints outlive the job in all >> regards. It's the users responsibility to eventually clean that up. >> >> >> >> On Wed, Jul 7, 2021 at 6:56 PM Rakshit Ramesh < >> rakshit.ram...@datakaveri.org> wrote: >> >>> Yes I could understand restoring a savepoint to a datastream. >>> What I couldn't figure out is to create a NewSavepoint for a datastream. >>> What I understand is that NewSavepoints only take in Bootstrap >>> transformation for Dataset Transform functions. >>> >>> >>> About the checkpoints, does >>> CheckpointConfig.ExternalizedCheckpointCleanup = RETAIN_ON_CANCELLATION >>> offer the same behaviour when the job is "FINISHED" and not "CANCELLED" ? >>> >>> What I'm looking for is a way to retain the state for a bounded job so >>> that the state is reloaded on the next job run (through api). >>> >>> On Wed, 7 Jul 2021 at 14:18, Arvid Heise <ar...@apache.org> wrote: >>> >>>> Hi Rakshit, >>>> >>>> The example is valid. The state processor API is kinda working like a >>>> DataSet application but the state is meant to be read in DataStream. Please >>>> check out the SavepointWriterITCase [1] for a full example. There is no >>>> checkpoint/savepoint in DataSet applications. >>>> >>>> Checkpoints can be stored on different checkpoint storages, such as S3 >>>> or HDFS. If you use RocksDB state backend, Flink pretty much just copy the >>>> SST files of RocksDB to S3. Checkpoints are usually bound to the life of an >>>> application. So they are created by the application and deleted on >>>> termination. >>>> However, you can resume an application both from savepoint and >>>> checkpoints. Checkpoints can be retained [2] to avoid them being deleted by >>>> the application during termination. But that's considered an advanced >>>> feature and you should first try it with savepoints. >>>> >>>> [1] >>>> https://github.com/apache/flink/blob/release-1.13.0/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java#L141-L141 >>>> [2] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#retained-checkpoints >>>> >>>> On Mon, Jul 5, 2021 at 5:56 PM Rakshit Ramesh < >>>> rakshit.ram...@datakaveri.org> wrote: >>>> >>>>> I'm trying to bootstrap state into a KeyedProcessFunction equivalent >>>>> that takes in >>>>> a DataStream but I'm unable to find a reference for the same. >>>>> I found this gist >>>>> https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf >>>>> But it seems to only apply for DataSet. >>>>> My usecase is to manually trigger a Savepoint into s3 for later reuse. >>>>> I'm also guessing that checkpoints can't be stored in rocksdb or s3 >>>>> for later reuse. >>>>> >>>>