Hi Rakshit,

I think FLIP-147 might still not be able to support this case, since
for bounded jobs, it supports each task exit after a checkpoint to 
commit the remaining data, but it could not ensures all the tasks
exit after the same checkpoint; for savepoint, it could not supporting
taking a savepoint after all the tasks are finished. 

We are also thinking on if we could support this case with some methods.
For now, I think we may still need the workaround method as Arvid points out,
that we coordinate all the sources to wait till a final checkpoint is completed
before exit. 

Best,
yun



 ------------------Original Mail ------------------
Sender:Rakshit Ramesh <rakshit.ram...@datakaveri.org>
Send Date:Fri Sep 17 17:20:40 2021
Recipients:Arvid Heise <ar...@apache.org>
CC:user <user@flink.apache.org>
Subject:Re: Savepoints with bootstraping a datastream function

Hi Arvid.
I went through the code, confluence and jira on FLIP-147.
I couldn't determine if it's possible to manually trigger a 
savepoint/checkpoint as 
I couldn't find any javadoc apis for the same.
Also, would the setting "ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH" still create a 
checkpoint
if my entire bounded job finishes before the checkpoint interval? 

On Tue, 13 Jul 2021 at 17:58, Rakshit Ramesh <rakshit.ram...@datakaveri.org> 
wrote:

That's great news!
Thanks.

On Tue, 13 Jul 2021 at 14:17, Arvid Heise <ar...@apache.org> wrote:

The only known workaround is to provide your own source(function) that doesn't 
finish until all of the source subtasks finish and a final checkpoint is 
completed. However, coordinating the sources with the old SourceFunction 
interface requires some external tool.

FLIP-147 is targeted for 1.14 in August.

On Sat, Jul 10, 2021 at 7:46 PM Rakshit Ramesh <rakshit.ram...@datakaveri.org> 
wrote:

Hi Arvid,
Since I'm trying to save checkpoints for a bounded process
the checkpoint isn't being written on time since the job finishes before that 
can happen.

Looks like one major feature that would be required for this to work is 
discussed in FLIP-147
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished

Is there any userland workaround for this?

Thanks!

On Thu, 8 Jul 2021 at 11:52, Rakshit Ramesh <rakshit.ram...@datakaveri.org> 
wrote:

Yes! I was only worried about the jobid changing and the checkpoint being 
un-referenceable. 
But since I can pass a path to the checkpoint that will not be an issue.


Thanks a lot for your suggestions!

On Thu, 8 Jul 2021 at 11:26, Arvid Heise <ar...@apache.org> wrote:

Hi Rakshit,

It sounds to me as if you don't need the Savepoint API at all. You can 
(re)start all applications with the previous state (be it retained checkpoint 
or savepoint). You just need to provide the path to that in your application 
invocation [1] (every entry point has such a parameter, you might need to check 
the respective documentation if you are not using CLI). Note that although it 
only says savepoint, starting from a checkpoint is fine as well (just not 
recommended in the beginning).

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#starting-a-job-from-a-savepoint
On Thu, Jul 8, 2021 at 6:31 AM Rakshit Ramesh <rakshit.ram...@datakaveri.org> 
wrote:

Sorry for being a little vague there.
I want to create a Savepoint from a DataStream right before the job is finished 
or cancelled.
What you have shown in the IT case is how a datastream can be bootstrapped with 
state that is
formed formed by means of DataSet. 
My jobs are triggered by a scheduler periodically (every day) using the api and 
I would like 
to bootstrap each day's job with the state of the previous day.

But thanks for the input on the Checkpoint behaviour wrt a FINISHED state, 
I think that will work for me.

Thanks!

On Thu, 8 Jul 2021 at 02:03, Arvid Heise <ar...@apache.org> wrote:

I don't quite understand your question. You use Savepoint API to create a 
savepoint with a batch job (that's why it's DataSet Transform currently). That 
savepoint can only be restored through a datastream application. Dataset 
applications cannot start from a savepoint.

So I don't understand why you see a difference between "restoring a savepoint 
to a datastream" and "create a NewSavepoint for a datastream". It's ultimately 
the same thing for me. Just to be very clear: the main purpose of Savepoint API 
is to create the initial state of a datastream application.

For your second question, yes retained checkpoints outlive the job in all 
regards. It's the users responsibility to eventually clean that up.



On Wed, Jul 7, 2021 at 6:56 PM Rakshit Ramesh <rakshit.ram...@datakaveri.org> 
wrote:

Yes I could understand restoring a savepoint to a datastream.
What I couldn't figure out is to create a NewSavepoint for a datastream.
What I understand is that NewSavepoints only take in Bootstrap transformation 
for Dataset Transform functions.


About the checkpoints, does 
 CheckpointConfig.ExternalizedCheckpointCleanup = RETAIN_ON_CANCELLATION 
offer the same behaviour when the job is "FINISHED" and not "CANCELLED" ?

What I'm looking for is a way to retain the state for a bounded job so that the 
state is reloaded on the next job run (through api).

On Wed, 7 Jul 2021 at 14:18, Arvid Heise <ar...@apache.org> wrote:

Hi Rakshit,

The example is valid. The state processor API is kinda working like a DataSet 
application but the state is meant to be read in DataStream. Please check out 
the SavepointWriterITCase [1] for a full example. There is no 
checkpoint/savepoint in DataSet applications.

Checkpoints can be stored on different checkpoint storages, such as S3 or HDFS. 
If you use RocksDB state backend, Flink pretty much just copy the SST files of 
RocksDB to S3. Checkpoints are usually bound to the life of an application. So 
they are created by the application and deleted on termination.
However, you can resume an application both from savepoint and checkpoints. 
Checkpoints can be retained [2] to avoid them being deleted by the application 
during termination. But that's considered an advanced feature and you should 
first try it with savepoints.

[1] 
https://github.com/apache/flink/blob/release-1.13.0/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java#L141-L141
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#retained-checkpoints
On Mon, Jul 5, 2021 at 5:56 PM Rakshit Ramesh <rakshit.ram...@datakaveri.org> 
wrote:

I'm trying to bootstrap state into a KeyedProcessFunction equivalent that takes 
in 
a DataStream but I'm unable to find a reference for the same.
I found this gist 
https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf
But it seems to only apply for DataSet.
My usecase is to manually trigger a Savepoint into s3 for later reuse.
I'm also guessing that checkpoints can't be stored in rocksdb or s3 for later 
reuse.

Reply via email to