Hi Arvid. I went through the code, confluence and jira on FLIP-147. I couldn't determine if it's possible to manually trigger a savepoint/checkpoint as I couldn't find any javadoc apis for the same. Also, would the setting "ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH" still create a checkpoint if my entire bounded job finishes before the checkpoint interval?
On Tue, 13 Jul 2021 at 17:58, Rakshit Ramesh <rakshit.ram...@datakaveri.org> wrote: > That's great news! > Thanks. > > On Tue, 13 Jul 2021 at 14:17, Arvid Heise <ar...@apache.org> wrote: > >> The only known workaround is to provide your own source(function) that >> doesn't finish until all of the source subtasks finish and a final >> checkpoint is completed. However, coordinating the sources with the old >> SourceFunction interface requires some external tool. >> >> FLIP-147 is targeted for 1.14 in August. >> >> On Sat, Jul 10, 2021 at 7:46 PM Rakshit Ramesh < >> rakshit.ram...@datakaveri.org> wrote: >> >>> Hi Arvid, >>> Since I'm trying to save checkpoints for a bounded process >>> the checkpoint isn't being written on time since the job finishes before >>> that can happen. >>> >>> Looks like one major feature that would be required for this to work is >>> discussed in FLIP-147 >>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished >>> >>> Is there any userland workaround for this? >>> >>> Thanks! >>> >>> On Thu, 8 Jul 2021 at 11:52, Rakshit Ramesh < >>> rakshit.ram...@datakaveri.org> wrote: >>> >>>> Yes! I was only worried about the jobid changing and the checkpoint >>>> being un-referenceable. >>>> But since I can pass a path to the checkpoint that will not be an issue. >>>> >>>> >>>> Thanks a lot for your suggestions! >>>> >>>> On Thu, 8 Jul 2021 at 11:26, Arvid Heise <ar...@apache.org> wrote: >>>> >>>>> Hi Rakshit, >>>>> >>>>> It sounds to me as if you don't need the Savepoint API at all. You can >>>>> (re)start all applications with the previous state (be it retained >>>>> checkpoint or savepoint). You just need to provide the path to that in >>>>> your >>>>> application invocation [1] (every entry point has such a parameter, you >>>>> might need to check the respective documentation if you are not using >>>>> CLI). >>>>> Note that although it only says savepoint, starting from a checkpoint is >>>>> fine as well (just not recommended in the beginning). >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#starting-a-job-from-a-savepoint >>>>> >>>>> On Thu, Jul 8, 2021 at 6:31 AM Rakshit Ramesh < >>>>> rakshit.ram...@datakaveri.org> wrote: >>>>> >>>>>> Sorry for being a little vague there. >>>>>> I want to create a Savepoint from a DataStream right before the job >>>>>> is finished or cancelled. >>>>>> What you have shown in the IT case is how a datastream can be >>>>>> bootstrapped with state that is >>>>>> formed formed by means of DataSet. >>>>>> My jobs are triggered by a scheduler periodically (every day) using >>>>>> the api and I would like >>>>>> to bootstrap each day's job with the state of the previous day. >>>>>> >>>>>> But thanks for the input on the Checkpoint behaviour wrt a FINISHED >>>>>> state, >>>>>> I think that will work for me. >>>>>> >>>>>> Thanks! >>>>>> >>>>>> On Thu, 8 Jul 2021 at 02:03, Arvid Heise <ar...@apache.org> wrote: >>>>>> >>>>>>> I don't quite understand your question. You use Savepoint API to >>>>>>> create a savepoint with a batch job (that's why it's DataSet Transform >>>>>>> currently). That savepoint can only be restored through a datastream >>>>>>> application. Dataset applications cannot start from a savepoint. >>>>>>> >>>>>>> So I don't understand why you see a difference between "restoring a >>>>>>> savepoint to a datastream" and "create a NewSavepoint for a datastream". >>>>>>> It's ultimately the same thing for me. Just to be very clear: the main >>>>>>> purpose of Savepoint API is to create the initial state of a datastream >>>>>>> application. >>>>>>> >>>>>>> For your second question, yes retained checkpoints outlive the job >>>>>>> in all regards. It's the users responsibility to eventually clean that >>>>>>> up. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Jul 7, 2021 at 6:56 PM Rakshit Ramesh < >>>>>>> rakshit.ram...@datakaveri.org> wrote: >>>>>>> >>>>>>>> Yes I could understand restoring a savepoint to a datastream. >>>>>>>> What I couldn't figure out is to create a NewSavepoint for a >>>>>>>> datastream. >>>>>>>> What I understand is that NewSavepoints only take in Bootstrap >>>>>>>> transformation for Dataset Transform functions. >>>>>>>> >>>>>>>> >>>>>>>> About the checkpoints, does >>>>>>>> CheckpointConfig.ExternalizedCheckpointCleanup = >>>>>>>> RETAIN_ON_CANCELLATION >>>>>>>> offer the same behaviour when the job is "FINISHED" and not >>>>>>>> "CANCELLED" ? >>>>>>>> >>>>>>>> What I'm looking for is a way to retain the state for a bounded job >>>>>>>> so that the state is reloaded on the next job run (through api). >>>>>>>> >>>>>>>> On Wed, 7 Jul 2021 at 14:18, Arvid Heise <ar...@apache.org> wrote: >>>>>>>> >>>>>>>>> Hi Rakshit, >>>>>>>>> >>>>>>>>> The example is valid. The state processor API is kinda working >>>>>>>>> like a DataSet application but the state is meant to be read in >>>>>>>>> DataStream. >>>>>>>>> Please check out the SavepointWriterITCase [1] for a full example. >>>>>>>>> There is >>>>>>>>> no checkpoint/savepoint in DataSet applications. >>>>>>>>> >>>>>>>>> Checkpoints can be stored on different checkpoint storages, such >>>>>>>>> as S3 or HDFS. If you use RocksDB state backend, Flink pretty much >>>>>>>>> just >>>>>>>>> copy the SST files of RocksDB to S3. Checkpoints are usually bound to >>>>>>>>> the >>>>>>>>> life of an application. So they are created by the application and >>>>>>>>> deleted >>>>>>>>> on termination. >>>>>>>>> However, you can resume an application both from savepoint and >>>>>>>>> checkpoints. Checkpoints can be retained [2] to avoid them being >>>>>>>>> deleted by >>>>>>>>> the application during termination. But that's considered an advanced >>>>>>>>> feature and you should first try it with savepoints. >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://github.com/apache/flink/blob/release-1.13.0/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java#L141-L141 >>>>>>>>> [2] >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#retained-checkpoints >>>>>>>>> >>>>>>>>> On Mon, Jul 5, 2021 at 5:56 PM Rakshit Ramesh < >>>>>>>>> rakshit.ram...@datakaveri.org> wrote: >>>>>>>>> >>>>>>>>>> I'm trying to bootstrap state into a KeyedProcessFunction >>>>>>>>>> equivalent that takes in >>>>>>>>>> a DataStream but I'm unable to find a reference for the same. >>>>>>>>>> I found this gist >>>>>>>>>> >>>>>>>>>> https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf >>>>>>>>>> But it seems to only apply for DataSet. >>>>>>>>>> My usecase is to manually trigger a Savepoint into s3 for later >>>>>>>>>> reuse. >>>>>>>>>> I'm also guessing that checkpoints can't be stored in rocksdb or >>>>>>>>>> s3 for later reuse. >>>>>>>>>> >>>>>>>>>