Hi Arvid.
I went through the code, confluence and jira on FLIP-147.
I couldn't determine if it's possible to manually trigger a
savepoint/checkpoint as
I couldn't find any javadoc apis for the same.
Also, would the setting "ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH" still
create a checkpoint
if my entire bounded job finishes before the checkpoint interval?

On Tue, 13 Jul 2021 at 17:58, Rakshit Ramesh <rakshit.ram...@datakaveri.org>
wrote:

> That's great news!
> Thanks.
>
> On Tue, 13 Jul 2021 at 14:17, Arvid Heise <ar...@apache.org> wrote:
>
>> The only known workaround is to provide your own source(function) that
>> doesn't finish until all of the source subtasks finish and a final
>> checkpoint is completed. However, coordinating the sources with the old
>> SourceFunction interface requires some external tool.
>>
>> FLIP-147 is targeted for 1.14 in August.
>>
>> On Sat, Jul 10, 2021 at 7:46 PM Rakshit Ramesh <
>> rakshit.ram...@datakaveri.org> wrote:
>>
>>> Hi Arvid,
>>> Since I'm trying to save checkpoints for a bounded process
>>> the checkpoint isn't being written on time since the job finishes before
>>> that can happen.
>>>
>>> Looks like one major feature that would be required for this to work is
>>> discussed in FLIP-147
>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
>>>
>>> Is there any userland workaround for this?
>>>
>>> Thanks!
>>>
>>> On Thu, 8 Jul 2021 at 11:52, Rakshit Ramesh <
>>> rakshit.ram...@datakaveri.org> wrote:
>>>
>>>> Yes! I was only worried about the jobid changing and the checkpoint
>>>> being un-referenceable.
>>>> But since I can pass a path to the checkpoint that will not be an issue.
>>>>
>>>>
>>>> Thanks a lot for your suggestions!
>>>>
>>>> On Thu, 8 Jul 2021 at 11:26, Arvid Heise <ar...@apache.org> wrote:
>>>>
>>>>> Hi Rakshit,
>>>>>
>>>>> It sounds to me as if you don't need the Savepoint API at all. You can
>>>>> (re)start all applications with the previous state (be it retained
>>>>> checkpoint or savepoint). You just need to provide the path to that in 
>>>>> your
>>>>> application invocation [1] (every entry point has such a parameter, you
>>>>> might need to check the respective documentation if you are not using 
>>>>> CLI).
>>>>> Note that although it only says savepoint, starting from a checkpoint is
>>>>> fine as well (just not recommended in the beginning).
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#starting-a-job-from-a-savepoint
>>>>>
>>>>> On Thu, Jul 8, 2021 at 6:31 AM Rakshit Ramesh <
>>>>> rakshit.ram...@datakaveri.org> wrote:
>>>>>
>>>>>> Sorry for being a little vague there.
>>>>>> I want to create a Savepoint from a DataStream right before the job
>>>>>> is finished or cancelled.
>>>>>> What you have shown in the IT case is how a datastream can be
>>>>>> bootstrapped with state that is
>>>>>> formed formed by means of DataSet.
>>>>>> My jobs are triggered by a scheduler periodically (every day) using
>>>>>> the api and I would like
>>>>>> to bootstrap each day's job with the state of the previous day.
>>>>>>
>>>>>> But thanks for the input on the Checkpoint behaviour wrt a FINISHED
>>>>>> state,
>>>>>> I think that will work for me.
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Thu, 8 Jul 2021 at 02:03, Arvid Heise <ar...@apache.org> wrote:
>>>>>>
>>>>>>> I don't quite understand your question. You use Savepoint API to
>>>>>>> create a savepoint with a batch job (that's why it's DataSet Transform
>>>>>>> currently). That savepoint can only be restored through a datastream
>>>>>>> application. Dataset applications cannot start from a savepoint.
>>>>>>>
>>>>>>> So I don't understand why you see a difference between "restoring a
>>>>>>> savepoint to a datastream" and "create a NewSavepoint for a datastream".
>>>>>>> It's ultimately the same thing for me. Just to be very clear: the main
>>>>>>> purpose of Savepoint API is to create the initial state of a datastream
>>>>>>> application.
>>>>>>>
>>>>>>> For your second question, yes retained checkpoints outlive the job
>>>>>>> in all regards. It's the users responsibility to eventually clean that 
>>>>>>> up.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jul 7, 2021 at 6:56 PM Rakshit Ramesh <
>>>>>>> rakshit.ram...@datakaveri.org> wrote:
>>>>>>>
>>>>>>>> Yes I could understand restoring a savepoint to a datastream.
>>>>>>>> What I couldn't figure out is to create a NewSavepoint for a
>>>>>>>> datastream.
>>>>>>>> What I understand is that NewSavepoints only take in Bootstrap
>>>>>>>> transformation for Dataset Transform functions.
>>>>>>>>
>>>>>>>>
>>>>>>>> About the checkpoints, does
>>>>>>>>  CheckpointConfig.ExternalizedCheckpointCleanup =
>>>>>>>> RETAIN_ON_CANCELLATION
>>>>>>>> offer the same behaviour when the job is "FINISHED" and not
>>>>>>>> "CANCELLED" ?
>>>>>>>>
>>>>>>>> What I'm looking for is a way to retain the state for a bounded job
>>>>>>>> so that the state is reloaded on the next job run (through api).
>>>>>>>>
>>>>>>>> On Wed, 7 Jul 2021 at 14:18, Arvid Heise <ar...@apache.org> wrote:
>>>>>>>>
>>>>>>>>> Hi Rakshit,
>>>>>>>>>
>>>>>>>>> The example is valid. The state processor API is kinda working
>>>>>>>>> like a DataSet application but the state is meant to be read in 
>>>>>>>>> DataStream.
>>>>>>>>> Please check out the SavepointWriterITCase [1] for a full example. 
>>>>>>>>> There is
>>>>>>>>> no checkpoint/savepoint in DataSet applications.
>>>>>>>>>
>>>>>>>>> Checkpoints can be stored on different checkpoint storages, such
>>>>>>>>> as S3 or HDFS. If you use RocksDB state backend, Flink pretty much 
>>>>>>>>> just
>>>>>>>>> copy the SST files of RocksDB to S3. Checkpoints are usually bound to 
>>>>>>>>> the
>>>>>>>>> life of an application. So they are created by the application and 
>>>>>>>>> deleted
>>>>>>>>> on termination.
>>>>>>>>> However, you can resume an application both from savepoint and
>>>>>>>>> checkpoints. Checkpoints can be retained [2] to avoid them being 
>>>>>>>>> deleted by
>>>>>>>>> the application during termination. But that's considered an advanced
>>>>>>>>> feature and you should first try it with savepoints.
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://github.com/apache/flink/blob/release-1.13.0/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java#L141-L141
>>>>>>>>> [2]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#retained-checkpoints
>>>>>>>>>
>>>>>>>>> On Mon, Jul 5, 2021 at 5:56 PM Rakshit Ramesh <
>>>>>>>>> rakshit.ram...@datakaveri.org> wrote:
>>>>>>>>>
>>>>>>>>>> I'm trying to bootstrap state into a KeyedProcessFunction
>>>>>>>>>> equivalent that takes in
>>>>>>>>>> a DataStream but I'm unable to find a reference for the same.
>>>>>>>>>> I found this gist
>>>>>>>>>>
>>>>>>>>>> https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf
>>>>>>>>>> But it seems to only apply for DataSet.
>>>>>>>>>> My usecase is to manually trigger a Savepoint into s3 for later
>>>>>>>>>> reuse.
>>>>>>>>>> I'm also guessing that checkpoints can't be stored in rocksdb or
>>>>>>>>>> s3 for later reuse.
>>>>>>>>>>
>>>>>>>>>

Reply via email to