Thanks for the information Yun!
I will go with the workaround then.

On Fri, 17 Sept 2021 at 15:22, Yun Gao <yungao...@aliyun.com> wrote:

> Hi Rakshit,
>
> I think FLIP-147 might still not be able to support this case, since
> for bounded jobs, it supports each task exit after a checkpoint to
> commit the remaining data, but it could not ensures all the tasks
> exit after the same checkpoint; for savepoint, it could not supporting
> taking a savepoint after all the tasks are finished.
>
> We are also thinking on if we could support this case with some methods.
> For now, I think we may still need the workaround method as Arvid points
> out,
> that we coordinate all the sources to wait till a final checkpoint is
> completed
> before exit.
>
> Best,
> yun
>
>
> ------------------Original Mail ------------------
> *Sender:*Rakshit Ramesh <rakshit.ram...@datakaveri.org>
> *Send Date:*Fri Sep 17 17:20:40 2021
> *Recipients:*Arvid Heise <ar...@apache.org>
> *CC:*user <user@flink.apache.org>
> *Subject:*Re: Savepoints with bootstraping a datastream function
>
>> Hi Arvid.
>> I went through the code, confluence and jira on FLIP-147.
>> I couldn't determine if it's possible to manually trigger a
>> savepoint/checkpoint as
>> I couldn't find any javadoc apis for the same.
>> Also, would the setting "ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH" still
>> create a checkpoint
>> if my entire bounded job finishes before the checkpoint interval?
>>
>> On Tue, 13 Jul 2021 at 17:58, Rakshit Ramesh <
>> rakshit.ram...@datakaveri.org> wrote:
>>
>>> That's great news!
>>> Thanks.
>>>
>>> On Tue, 13 Jul 2021 at 14:17, Arvid Heise <ar...@apache.org> wrote:
>>>
>>>> The only known workaround is to provide your own source(function) that
>>>> doesn't finish until all of the source subtasks finish and a final
>>>> checkpoint is completed. However, coordinating the sources with the old
>>>> SourceFunction interface requires some external tool.
>>>>
>>>> FLIP-147 is targeted for 1.14 in August.
>>>>
>>>> On Sat, Jul 10, 2021 at 7:46 PM Rakshit Ramesh <
>>>> rakshit.ram...@datakaveri.org> wrote:
>>>>
>>>>> Hi Arvid,
>>>>> Since I'm trying to save checkpoints for a bounded process
>>>>> the checkpoint isn't being written on time since the job finishes
>>>>> before that can happen.
>>>>>
>>>>> Looks like one major feature that would be required for this to work
>>>>> is discussed in FLIP-147
>>>>>
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
>>>>>
>>>>> Is there any userland workaround for this?
>>>>>
>>>>> Thanks!
>>>>>
>>>>> On Thu, 8 Jul 2021 at 11:52, Rakshit Ramesh <
>>>>> rakshit.ram...@datakaveri.org> wrote:
>>>>>
>>>>>> Yes! I was only worried about the jobid changing and the checkpoint
>>>>>> being un-referenceable.
>>>>>> But since I can pass a path to the checkpoint that will not be an
>>>>>> issue.
>>>>>>
>>>>>>
>>>>>> Thanks a lot for your suggestions!
>>>>>>
>>>>>> On Thu, 8 Jul 2021 at 11:26, Arvid Heise <ar...@apache.org> wrote:
>>>>>>
>>>>>>> Hi Rakshit,
>>>>>>>
>>>>>>> It sounds to me as if you don't need the Savepoint API at all. You
>>>>>>> can (re)start all applications with the previous state (be it retained
>>>>>>> checkpoint or savepoint). You just need to provide the path to that in 
>>>>>>> your
>>>>>>> application invocation [1] (every entry point has such a parameter, you
>>>>>>> might need to check the respective documentation if you are not using 
>>>>>>> CLI).
>>>>>>> Note that although it only says savepoint, starting from a checkpoint is
>>>>>>> fine as well (just not recommended in the beginning).
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#starting-a-job-from-a-savepoint
>>>>>>>
>>>>>>> On Thu, Jul 8, 2021 at 6:31 AM Rakshit Ramesh <
>>>>>>> rakshit.ram...@datakaveri.org> wrote:
>>>>>>>
>>>>>>>> Sorry for being a little vague there.
>>>>>>>> I want to create a Savepoint from a DataStream right before the job
>>>>>>>> is finished or cancelled.
>>>>>>>> What you have shown in the IT case is how a datastream can be
>>>>>>>> bootstrapped with state that is
>>>>>>>> formed formed by means of DataSet.
>>>>>>>> My jobs are triggered by a scheduler periodically (every day) using
>>>>>>>> the api and I would like
>>>>>>>> to bootstrap each day's job with the state of the previous day.
>>>>>>>>
>>>>>>>> But thanks for the input on the Checkpoint behaviour wrt a FINISHED
>>>>>>>> state,
>>>>>>>> I think that will work for me.
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>> On Thu, 8 Jul 2021 at 02:03, Arvid Heise <ar...@apache.org> wrote:
>>>>>>>>
>>>>>>>>> I don't quite understand your question. You use Savepoint API to
>>>>>>>>> create a savepoint with a batch job (that's why it's DataSet Transform
>>>>>>>>> currently). That savepoint can only be restored through a datastream
>>>>>>>>> application. Dataset applications cannot start from a savepoint.
>>>>>>>>>
>>>>>>>>> So I don't understand why you see a difference between "restoring
>>>>>>>>> a savepoint to a datastream" and "create a NewSavepoint for a 
>>>>>>>>> datastream".
>>>>>>>>> It's ultimately the same thing for me. Just to be very clear: the main
>>>>>>>>> purpose of Savepoint API is to create the initial state of a 
>>>>>>>>> datastream
>>>>>>>>> application.
>>>>>>>>>
>>>>>>>>> For your second question, yes retained checkpoints outlive the job
>>>>>>>>> in all regards. It's the users responsibility to eventually clean 
>>>>>>>>> that up.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jul 7, 2021 at 6:56 PM Rakshit Ramesh <
>>>>>>>>> rakshit.ram...@datakaveri.org> wrote:
>>>>>>>>>
>>>>>>>>>> Yes I could understand restoring a savepoint to a datastream.
>>>>>>>>>> What I couldn't figure out is to create a NewSavepoint for a
>>>>>>>>>> datastream.
>>>>>>>>>> What I understand is that NewSavepoints only take in Bootstrap
>>>>>>>>>> transformation for Dataset Transform functions.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> About the checkpoints, does
>>>>>>>>>>  CheckpointConfig.ExternalizedCheckpointCleanup =
>>>>>>>>>> RETAIN_ON_CANCELLATION
>>>>>>>>>> offer the same behaviour when the job is "FINISHED" and not
>>>>>>>>>> "CANCELLED" ?
>>>>>>>>>>
>>>>>>>>>> What I'm looking for is a way to retain the state for a bounded
>>>>>>>>>> job so that the state is reloaded on the next job run (through api).
>>>>>>>>>>
>>>>>>>>>> On Wed, 7 Jul 2021 at 14:18, Arvid Heise <ar...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Rakshit,
>>>>>>>>>>>
>>>>>>>>>>> The example is valid. The state processor API is kinda working
>>>>>>>>>>> like a DataSet application but the state is meant to be read in 
>>>>>>>>>>> DataStream.
>>>>>>>>>>> Please check out the SavepointWriterITCase [1] for a full example. 
>>>>>>>>>>> There is
>>>>>>>>>>> no checkpoint/savepoint in DataSet applications.
>>>>>>>>>>>
>>>>>>>>>>> Checkpoints can be stored on different checkpoint storages, such
>>>>>>>>>>> as S3 or HDFS. If you use RocksDB state backend, Flink pretty much 
>>>>>>>>>>> just
>>>>>>>>>>> copy the SST files of RocksDB to S3. Checkpoints are usually bound 
>>>>>>>>>>> to the
>>>>>>>>>>> life of an application. So they are created by the application and 
>>>>>>>>>>> deleted
>>>>>>>>>>> on termination.
>>>>>>>>>>> However, you can resume an application both from savepoint and
>>>>>>>>>>> checkpoints. Checkpoints can be retained [2] to avoid them being 
>>>>>>>>>>> deleted by
>>>>>>>>>>> the application during termination. But that's considered an 
>>>>>>>>>>> advanced
>>>>>>>>>>> feature and you should first try it with savepoints.
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://github.com/apache/flink/blob/release-1.13.0/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java#L141-L141
>>>>>>>>>>> [2]
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#retained-checkpoints
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Jul 5, 2021 at 5:56 PM Rakshit Ramesh <
>>>>>>>>>>> rakshit.ram...@datakaveri.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I'm trying to bootstrap state into a KeyedProcessFunction
>>>>>>>>>>>> equivalent that takes in
>>>>>>>>>>>> a DataStream but I'm unable to find a reference for the same.
>>>>>>>>>>>> I found this gist
>>>>>>>>>>>>
>>>>>>>>>>>> https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf
>>>>>>>>>>>> But it seems to only apply for DataSet.
>>>>>>>>>>>> My usecase is to manually trigger a Savepoint into s3 for later
>>>>>>>>>>>> reuse.
>>>>>>>>>>>> I'm also guessing that checkpoints can't be stored in rocksdb
>>>>>>>>>>>> or s3 for later reuse.
>>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to