Thanks for the information Yun! I will go with the workaround then. On Fri, 17 Sept 2021 at 15:22, Yun Gao <yungao...@aliyun.com> wrote:
> Hi Rakshit, > > I think FLIP-147 might still not be able to support this case, since > for bounded jobs, it supports each task exit after a checkpoint to > commit the remaining data, but it could not ensures all the tasks > exit after the same checkpoint; for savepoint, it could not supporting > taking a savepoint after all the tasks are finished. > > We are also thinking on if we could support this case with some methods. > For now, I think we may still need the workaround method as Arvid points > out, > that we coordinate all the sources to wait till a final checkpoint is > completed > before exit. > > Best, > yun > > > ------------------Original Mail ------------------ > *Sender:*Rakshit Ramesh <rakshit.ram...@datakaveri.org> > *Send Date:*Fri Sep 17 17:20:40 2021 > *Recipients:*Arvid Heise <ar...@apache.org> > *CC:*user <user@flink.apache.org> > *Subject:*Re: Savepoints with bootstraping a datastream function > >> Hi Arvid. >> I went through the code, confluence and jira on FLIP-147. >> I couldn't determine if it's possible to manually trigger a >> savepoint/checkpoint as >> I couldn't find any javadoc apis for the same. >> Also, would the setting "ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH" still >> create a checkpoint >> if my entire bounded job finishes before the checkpoint interval? >> >> On Tue, 13 Jul 2021 at 17:58, Rakshit Ramesh < >> rakshit.ram...@datakaveri.org> wrote: >> >>> That's great news! >>> Thanks. >>> >>> On Tue, 13 Jul 2021 at 14:17, Arvid Heise <ar...@apache.org> wrote: >>> >>>> The only known workaround is to provide your own source(function) that >>>> doesn't finish until all of the source subtasks finish and a final >>>> checkpoint is completed. However, coordinating the sources with the old >>>> SourceFunction interface requires some external tool. >>>> >>>> FLIP-147 is targeted for 1.14 in August. >>>> >>>> On Sat, Jul 10, 2021 at 7:46 PM Rakshit Ramesh < >>>> rakshit.ram...@datakaveri.org> wrote: >>>> >>>>> Hi Arvid, >>>>> Since I'm trying to save checkpoints for a bounded process >>>>> the checkpoint isn't being written on time since the job finishes >>>>> before that can happen. >>>>> >>>>> Looks like one major feature that would be required for this to work >>>>> is discussed in FLIP-147 >>>>> >>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished >>>>> >>>>> Is there any userland workaround for this? >>>>> >>>>> Thanks! >>>>> >>>>> On Thu, 8 Jul 2021 at 11:52, Rakshit Ramesh < >>>>> rakshit.ram...@datakaveri.org> wrote: >>>>> >>>>>> Yes! I was only worried about the jobid changing and the checkpoint >>>>>> being un-referenceable. >>>>>> But since I can pass a path to the checkpoint that will not be an >>>>>> issue. >>>>>> >>>>>> >>>>>> Thanks a lot for your suggestions! >>>>>> >>>>>> On Thu, 8 Jul 2021 at 11:26, Arvid Heise <ar...@apache.org> wrote: >>>>>> >>>>>>> Hi Rakshit, >>>>>>> >>>>>>> It sounds to me as if you don't need the Savepoint API at all. You >>>>>>> can (re)start all applications with the previous state (be it retained >>>>>>> checkpoint or savepoint). You just need to provide the path to that in >>>>>>> your >>>>>>> application invocation [1] (every entry point has such a parameter, you >>>>>>> might need to check the respective documentation if you are not using >>>>>>> CLI). >>>>>>> Note that although it only says savepoint, starting from a checkpoint is >>>>>>> fine as well (just not recommended in the beginning). >>>>>>> >>>>>>> [1] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#starting-a-job-from-a-savepoint >>>>>>> >>>>>>> On Thu, Jul 8, 2021 at 6:31 AM Rakshit Ramesh < >>>>>>> rakshit.ram...@datakaveri.org> wrote: >>>>>>> >>>>>>>> Sorry for being a little vague there. >>>>>>>> I want to create a Savepoint from a DataStream right before the job >>>>>>>> is finished or cancelled. >>>>>>>> What you have shown in the IT case is how a datastream can be >>>>>>>> bootstrapped with state that is >>>>>>>> formed formed by means of DataSet. >>>>>>>> My jobs are triggered by a scheduler periodically (every day) using >>>>>>>> the api and I would like >>>>>>>> to bootstrap each day's job with the state of the previous day. >>>>>>>> >>>>>>>> But thanks for the input on the Checkpoint behaviour wrt a FINISHED >>>>>>>> state, >>>>>>>> I think that will work for me. >>>>>>>> >>>>>>>> Thanks! >>>>>>>> >>>>>>>> On Thu, 8 Jul 2021 at 02:03, Arvid Heise <ar...@apache.org> wrote: >>>>>>>> >>>>>>>>> I don't quite understand your question. You use Savepoint API to >>>>>>>>> create a savepoint with a batch job (that's why it's DataSet Transform >>>>>>>>> currently). That savepoint can only be restored through a datastream >>>>>>>>> application. Dataset applications cannot start from a savepoint. >>>>>>>>> >>>>>>>>> So I don't understand why you see a difference between "restoring >>>>>>>>> a savepoint to a datastream" and "create a NewSavepoint for a >>>>>>>>> datastream". >>>>>>>>> It's ultimately the same thing for me. Just to be very clear: the main >>>>>>>>> purpose of Savepoint API is to create the initial state of a >>>>>>>>> datastream >>>>>>>>> application. >>>>>>>>> >>>>>>>>> For your second question, yes retained checkpoints outlive the job >>>>>>>>> in all regards. It's the users responsibility to eventually clean >>>>>>>>> that up. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Jul 7, 2021 at 6:56 PM Rakshit Ramesh < >>>>>>>>> rakshit.ram...@datakaveri.org> wrote: >>>>>>>>> >>>>>>>>>> Yes I could understand restoring a savepoint to a datastream. >>>>>>>>>> What I couldn't figure out is to create a NewSavepoint for a >>>>>>>>>> datastream. >>>>>>>>>> What I understand is that NewSavepoints only take in Bootstrap >>>>>>>>>> transformation for Dataset Transform functions. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> About the checkpoints, does >>>>>>>>>> CheckpointConfig.ExternalizedCheckpointCleanup = >>>>>>>>>> RETAIN_ON_CANCELLATION >>>>>>>>>> offer the same behaviour when the job is "FINISHED" and not >>>>>>>>>> "CANCELLED" ? >>>>>>>>>> >>>>>>>>>> What I'm looking for is a way to retain the state for a bounded >>>>>>>>>> job so that the state is reloaded on the next job run (through api). >>>>>>>>>> >>>>>>>>>> On Wed, 7 Jul 2021 at 14:18, Arvid Heise <ar...@apache.org> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Rakshit, >>>>>>>>>>> >>>>>>>>>>> The example is valid. The state processor API is kinda working >>>>>>>>>>> like a DataSet application but the state is meant to be read in >>>>>>>>>>> DataStream. >>>>>>>>>>> Please check out the SavepointWriterITCase [1] for a full example. >>>>>>>>>>> There is >>>>>>>>>>> no checkpoint/savepoint in DataSet applications. >>>>>>>>>>> >>>>>>>>>>> Checkpoints can be stored on different checkpoint storages, such >>>>>>>>>>> as S3 or HDFS. If you use RocksDB state backend, Flink pretty much >>>>>>>>>>> just >>>>>>>>>>> copy the SST files of RocksDB to S3. Checkpoints are usually bound >>>>>>>>>>> to the >>>>>>>>>>> life of an application. So they are created by the application and >>>>>>>>>>> deleted >>>>>>>>>>> on termination. >>>>>>>>>>> However, you can resume an application both from savepoint and >>>>>>>>>>> checkpoints. Checkpoints can be retained [2] to avoid them being >>>>>>>>>>> deleted by >>>>>>>>>>> the application during termination. But that's considered an >>>>>>>>>>> advanced >>>>>>>>>>> feature and you should first try it with savepoints. >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> https://github.com/apache/flink/blob/release-1.13.0/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java#L141-L141 >>>>>>>>>>> [2] >>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#retained-checkpoints >>>>>>>>>>> >>>>>>>>>>> On Mon, Jul 5, 2021 at 5:56 PM Rakshit Ramesh < >>>>>>>>>>> rakshit.ram...@datakaveri.org> wrote: >>>>>>>>>>> >>>>>>>>>>>> I'm trying to bootstrap state into a KeyedProcessFunction >>>>>>>>>>>> equivalent that takes in >>>>>>>>>>>> a DataStream but I'm unable to find a reference for the same. >>>>>>>>>>>> I found this gist >>>>>>>>>>>> >>>>>>>>>>>> https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf >>>>>>>>>>>> But it seems to only apply for DataSet. >>>>>>>>>>>> My usecase is to manually trigger a Savepoint into s3 for later >>>>>>>>>>>> reuse. >>>>>>>>>>>> I'm also guessing that checkpoints can't be stored in rocksdb >>>>>>>>>>>> or s3 for later reuse. >>>>>>>>>>>> >>>>>>>>>>>