Sorry for being a little vague there. I want to create a Savepoint from a DataStream right before the job is finished or cancelled. What you have shown in the IT case is how a datastream can be bootstrapped with state that is formed formed by means of DataSet. My jobs are triggered by a scheduler periodically (every day) using the api and I would like to bootstrap each day's job with the state of the previous day.
But thanks for the input on the Checkpoint behaviour wrt a FINISHED state, I think that will work for me. Thanks! On Thu, 8 Jul 2021 at 02:03, Arvid Heise <ar...@apache.org> wrote: > I don't quite understand your question. You use Savepoint API to create a > savepoint with a batch job (that's why it's DataSet Transform currently). > That savepoint can only be restored through a datastream application. > Dataset applications cannot start from a savepoint. > > So I don't understand why you see a difference between "restoring a > savepoint to a datastream" and "create a NewSavepoint for a datastream". > It's ultimately the same thing for me. Just to be very clear: the main > purpose of Savepoint API is to create the initial state of a datastream > application. > > For your second question, yes retained checkpoints outlive the job in all > regards. It's the users responsibility to eventually clean that up. > > > > On Wed, Jul 7, 2021 at 6:56 PM Rakshit Ramesh < > rakshit.ram...@datakaveri.org> wrote: > >> Yes I could understand restoring a savepoint to a datastream. >> What I couldn't figure out is to create a NewSavepoint for a datastream. >> What I understand is that NewSavepoints only take in Bootstrap >> transformation for Dataset Transform functions. >> >> >> About the checkpoints, does >> CheckpointConfig.ExternalizedCheckpointCleanup = RETAIN_ON_CANCELLATION >> offer the same behaviour when the job is "FINISHED" and not "CANCELLED" ? >> >> What I'm looking for is a way to retain the state for a bounded job so >> that the state is reloaded on the next job run (through api). >> >> On Wed, 7 Jul 2021 at 14:18, Arvid Heise <ar...@apache.org> wrote: >> >>> Hi Rakshit, >>> >>> The example is valid. The state processor API is kinda working like a >>> DataSet application but the state is meant to be read in DataStream. Please >>> check out the SavepointWriterITCase [1] for a full example. There is no >>> checkpoint/savepoint in DataSet applications. >>> >>> Checkpoints can be stored on different checkpoint storages, such as S3 >>> or HDFS. If you use RocksDB state backend, Flink pretty much just copy the >>> SST files of RocksDB to S3. Checkpoints are usually bound to the life of an >>> application. So they are created by the application and deleted on >>> termination. >>> However, you can resume an application both from savepoint and >>> checkpoints. Checkpoints can be retained [2] to avoid them being deleted by >>> the application during termination. But that's considered an advanced >>> feature and you should first try it with savepoints. >>> >>> [1] >>> https://github.com/apache/flink/blob/release-1.13.0/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java#L141-L141 >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#retained-checkpoints >>> >>> On Mon, Jul 5, 2021 at 5:56 PM Rakshit Ramesh < >>> rakshit.ram...@datakaveri.org> wrote: >>> >>>> I'm trying to bootstrap state into a KeyedProcessFunction equivalent >>>> that takes in >>>> a DataStream but I'm unable to find a reference for the same. >>>> I found this gist >>>> https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf >>>> But it seems to only apply for DataSet. >>>> My usecase is to manually trigger a Savepoint into s3 for later reuse. >>>> I'm also guessing that checkpoints can't be stored in rocksdb or s3 for >>>> later reuse. >>>> >>>