One of the way you should do is, have a separate cluster job manager program in kubernetes, which is actually managing jobs. So that you can decouple the job control. While restarting the job, make sure to follow the below steps:
a) First job manager takes save point by killing the job and notes down the save point path by using the save point rest api b) After that job manager starts the new job by supplying the save point path. So that it starts from the latest save point. So that you no need to rely on yaml configuration. Also above steps helps only for manual restart of the flink job. There are another 2 cases possible: case 1 => Your job restarts by it self with the help of flink cluster, then latest check point is going to take care of the job state, no need to worry about case 2 => Your job is failed. Then state is lost. To overcome this, as per the documentation best thing is: Take periodic save points. So that while restarting the job from crashes, provide the argument of latest save point path as argument to your job manager program. So the key is, have a seprate job manager of flink jobs so that you will have the flexibility Regards Bhaskar On Wed, Sep 25, 2019 at 6:38 PM Sean Hester <sean.hes...@bettercloud.com> wrote: > thanks for all replies! i'll definitely take a look at the Flink k8s > Operator project. > > i'll try to restate the issue to clarify. this issue is specific to > starting a job from a savepoint in job-cluster mode. in these cases the Job > Manager container is configured to run a single Flink job at start-up. the > savepoint needs to be provided as an argument to the entrypoint. the Flink > documentation for this approach is here: > > > https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint > > the issue is that taking this approach means that the job will *always* > start from the savepoint provided as the start argument in the Kubernetes > YAML. this includes unplanned restarts of the job manager, but we'd really > prefer any *unplanned* restarts resume for the most recent checkpoint > instead of restarting from the configured savepoint. so in a sense we want > the savepoint argument to be transient, only being used during the initial > deployment, but this runs counter to the design of Kubernetes which always > wants to restore a deployment to the "goal state" as defined in the YAML. > > i hope this helps. if you want more details please let me know, and thanks > again for your time. > > > On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <ha...@zendesk.com> wrote: > >> I think I overlooked it. Good point. I am using Redis to save the path to >> my savepoint, I might be able to set a TTL to avoid such issue. >> >> Hao Sun >> >> >> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <yuva...@gmail.com> >> wrote: >> >>> Hi Hao, >>> >>> I think he's exactly talking about the usecase where the JM/TM restart >>> and they come back up from the latest savepoint which might be stale by >>> that time. >>> >>> On Tue, 24 Sep 2019, 19:24 Hao Sun, <ha...@zendesk.com> wrote: >>> >>>> We always make a savepoint before we shutdown the job-cluster. So the >>>> savepoint is always the latest. When we fix a bug or change the job graph, >>>> it can resume well. >>>> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, >>>> uncaught exception, etc. >>>> >>>> Maybe I do not understand your use case well, I do not see a need to >>>> start from checkpoint after a bug fix. >>>> From what I know, currently you can use checkpoint as a savepoint as >>>> well >>>> >>>> Hao Sun >>>> >>>> >>>> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yuva...@gmail.com> >>>> wrote: >>>> >>>>> AFAIK there's currently nothing implemented to solve this problem, but >>>>> working on a possible fix can be implemented on top of >>>>> https://github.com/lyft/flinkk8soperator which already has a pretty >>>>> fancy state machine for rolling upgrades. I'd love to be involved as this >>>>> is an issue I've been thinking about as well. >>>>> >>>>> Yuval >>>>> >>>>> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester < >>>>> sean.hes...@bettercloud.com> wrote: >>>>> >>>>>> hi all--we've run into a gap (knowledge? design? tbd?) for our use >>>>>> cases when deploying Flink jobs to start from savepoints using the >>>>>> job-cluster mode in Kubernetes. >>>>>> >>>>>> we're running a ~15 different jobs, all in job-cluster mode, using a >>>>>> mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these >>>>>> are all long-running streaming jobs, all essentially acting as >>>>>> microservices. we're using Helm charts to configure all of our >>>>>> deployments. >>>>>> >>>>>> we have a number of use cases where we want to restart jobs from a >>>>>> savepoint to replay recent events, i.e. when we've enhanced the job logic >>>>>> or fixed a bug. but after the deployment we want to have the job resume >>>>>> it's "long-running" behavior, where any unplanned restarts resume from >>>>>> the >>>>>> latest checkpoint. >>>>>> >>>>>> the issue we run into is that any obvious/standard/idiomatic >>>>>> Kubernetes deployment includes the savepoint argument in the >>>>>> configuration. >>>>>> if the Job Manager container(s) have an unplanned restart, when they come >>>>>> back up they will start from the savepoint instead of resuming from the >>>>>> latest checkpoint. everything is working as configured, but that's not >>>>>> exactly what we want. we want the savepoint argument to be transient >>>>>> somehow (only used during the initial deployment), but Kubernetes doesn't >>>>>> really support the concept of transient configuration. >>>>>> >>>>>> i can see a couple of potential solutions that either involve custom >>>>>> code in the jobs or custom logic in the container (i.e. a custom >>>>>> entrypoint >>>>>> script that records that the configured savepoint has already been used >>>>>> in >>>>>> a file on a persistent volume or GCS, and potentially when/why/by which >>>>>> deployment). but these seem like unexpected and hacky solutions. before >>>>>> we >>>>>> head down that road i wanted to ask: >>>>>> >>>>>> - is this is already a solved problem that i've missed? >>>>>> - is this issue already on the community's radar? >>>>>> >>>>>> thanks in advance! >>>>>> >>>>>> -- >>>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865 >>>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 >>>>>> <http://www.bettercloud.com> <http://www.bettercloud.com> >>>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25* >>>>>> It’s not just an IT conference, it’s “a complete learning and >>>>>> networking experience” >>>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude> >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Best Regards, >>>>> Yuval Itzchakov. >>>>> >>>> > > -- > *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865 > 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 > <http://www.bettercloud.com> <http://www.bettercloud.com> > *Altitude 2019 in San Francisco | Sept. 23 - 25* > It’s not just an IT conference, it’s “a complete learning and networking > experience” > <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude> > >