That's great news! Thanks. On Tue, 13 Jul 2021 at 14:17, Arvid Heise <ar...@apache.org> wrote:
> The only known workaround is to provide your own source(function) that > doesn't finish until all of the source subtasks finish and a final > checkpoint is completed. However, coordinating the sources with the old > SourceFunction interface requires some external tool. > > FLIP-147 is targeted for 1.14 in August. > > On Sat, Jul 10, 2021 at 7:46 PM Rakshit Ramesh < > rakshit.ram...@datakaveri.org> wrote: > >> Hi Arvid, >> Since I'm trying to save checkpoints for a bounded process >> the checkpoint isn't being written on time since the job finishes before >> that can happen. >> >> Looks like one major feature that would be required for this to work is >> discussed in FLIP-147 >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished >> >> Is there any userland workaround for this? >> >> Thanks! >> >> On Thu, 8 Jul 2021 at 11:52, Rakshit Ramesh < >> rakshit.ram...@datakaveri.org> wrote: >> >>> Yes! I was only worried about the jobid changing and the checkpoint >>> being un-referenceable. >>> But since I can pass a path to the checkpoint that will not be an issue. >>> >>> >>> Thanks a lot for your suggestions! >>> >>> On Thu, 8 Jul 2021 at 11:26, Arvid Heise <ar...@apache.org> wrote: >>> >>>> Hi Rakshit, >>>> >>>> It sounds to me as if you don't need the Savepoint API at all. You can >>>> (re)start all applications with the previous state (be it retained >>>> checkpoint or savepoint). You just need to provide the path to that in your >>>> application invocation [1] (every entry point has such a parameter, you >>>> might need to check the respective documentation if you are not using CLI). >>>> Note that although it only says savepoint, starting from a checkpoint is >>>> fine as well (just not recommended in the beginning). >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#starting-a-job-from-a-savepoint >>>> >>>> On Thu, Jul 8, 2021 at 6:31 AM Rakshit Ramesh < >>>> rakshit.ram...@datakaveri.org> wrote: >>>> >>>>> Sorry for being a little vague there. >>>>> I want to create a Savepoint from a DataStream right before the job is >>>>> finished or cancelled. >>>>> What you have shown in the IT case is how a datastream can be >>>>> bootstrapped with state that is >>>>> formed formed by means of DataSet. >>>>> My jobs are triggered by a scheduler periodically (every day) using >>>>> the api and I would like >>>>> to bootstrap each day's job with the state of the previous day. >>>>> >>>>> But thanks for the input on the Checkpoint behaviour wrt a FINISHED >>>>> state, >>>>> I think that will work for me. >>>>> >>>>> Thanks! >>>>> >>>>> On Thu, 8 Jul 2021 at 02:03, Arvid Heise <ar...@apache.org> wrote: >>>>> >>>>>> I don't quite understand your question. You use Savepoint API to >>>>>> create a savepoint with a batch job (that's why it's DataSet Transform >>>>>> currently). That savepoint can only be restored through a datastream >>>>>> application. Dataset applications cannot start from a savepoint. >>>>>> >>>>>> So I don't understand why you see a difference between "restoring a >>>>>> savepoint to a datastream" and "create a NewSavepoint for a datastream". >>>>>> It's ultimately the same thing for me. Just to be very clear: the main >>>>>> purpose of Savepoint API is to create the initial state of a datastream >>>>>> application. >>>>>> >>>>>> For your second question, yes retained checkpoints outlive the job in >>>>>> all regards. It's the users responsibility to eventually clean that up. >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Jul 7, 2021 at 6:56 PM Rakshit Ramesh < >>>>>> rakshit.ram...@datakaveri.org> wrote: >>>>>> >>>>>>> Yes I could understand restoring a savepoint to a datastream. >>>>>>> What I couldn't figure out is to create a NewSavepoint for a >>>>>>> datastream. >>>>>>> What I understand is that NewSavepoints only take in Bootstrap >>>>>>> transformation for Dataset Transform functions. >>>>>>> >>>>>>> >>>>>>> About the checkpoints, does >>>>>>> CheckpointConfig.ExternalizedCheckpointCleanup = >>>>>>> RETAIN_ON_CANCELLATION >>>>>>> offer the same behaviour when the job is "FINISHED" and not >>>>>>> "CANCELLED" ? >>>>>>> >>>>>>> What I'm looking for is a way to retain the state for a bounded job >>>>>>> so that the state is reloaded on the next job run (through api). >>>>>>> >>>>>>> On Wed, 7 Jul 2021 at 14:18, Arvid Heise <ar...@apache.org> wrote: >>>>>>> >>>>>>>> Hi Rakshit, >>>>>>>> >>>>>>>> The example is valid. The state processor API is kinda working like >>>>>>>> a DataSet application but the state is meant to be read in DataStream. >>>>>>>> Please check out the SavepointWriterITCase [1] for a full example. >>>>>>>> There is >>>>>>>> no checkpoint/savepoint in DataSet applications. >>>>>>>> >>>>>>>> Checkpoints can be stored on different checkpoint storages, such as >>>>>>>> S3 or HDFS. If you use RocksDB state backend, Flink pretty much just >>>>>>>> copy >>>>>>>> the SST files of RocksDB to S3. Checkpoints are usually bound to the >>>>>>>> life >>>>>>>> of an application. So they are created by the application and deleted >>>>>>>> on >>>>>>>> termination. >>>>>>>> However, you can resume an application both from savepoint and >>>>>>>> checkpoints. Checkpoints can be retained [2] to avoid them being >>>>>>>> deleted by >>>>>>>> the application during termination. But that's considered an advanced >>>>>>>> feature and you should first try it with savepoints. >>>>>>>> >>>>>>>> [1] >>>>>>>> https://github.com/apache/flink/blob/release-1.13.0/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java#L141-L141 >>>>>>>> [2] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#retained-checkpoints >>>>>>>> >>>>>>>> On Mon, Jul 5, 2021 at 5:56 PM Rakshit Ramesh < >>>>>>>> rakshit.ram...@datakaveri.org> wrote: >>>>>>>> >>>>>>>>> I'm trying to bootstrap state into a KeyedProcessFunction >>>>>>>>> equivalent that takes in >>>>>>>>> a DataStream but I'm unable to find a reference for the same. >>>>>>>>> I found this gist >>>>>>>>> >>>>>>>>> https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf >>>>>>>>> But it seems to only apply for DataSet. >>>>>>>>> My usecase is to manually trigger a Savepoint into s3 for later >>>>>>>>> reuse. >>>>>>>>> I'm also guessing that checkpoints can't be stored in rocksdb or >>>>>>>>> s3 for later reuse. >>>>>>>>> >>>>>>>>