Hi Jiahui, Savepoint format is more universal and should be used when upgrading Flink versions. If you just upgrade the applications, there shouldn't be a big difference afaik.
On Tue, Jul 6, 2021 at 9:41 PM Jiahui Jiang <qzhzm173...@hotmail.com> wrote: > Hello Roman, > > Sorry I did some more testing and the original failure was caused by a > different part of the pipeline. We I added a new stateless operator, it was > able to restart from the previous savepoint with no issue. > > Another question I have is, since you explicitly asked whether it's a > savepoint or checkpoint, > What are the behavior differences when recovering from a checkpoint vs. a > savepoint? If the job graph changes between runs, but all the stateful > operators are guaranteed to have their UID fixed. Will a pipeline be able > to restore from the retained checkpoint if incremental checkpoint is > disabled? > > Thank you! > ------------------------------ > *From:* Roman Khachatryan <ro...@apache.org> > *Sent:* Friday, July 2, 2021 4:59 PM > *To:* Jiahui Jiang <qzhzm173...@hotmail.com> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Re: Understanding recovering from savepoint / checkpoint with > additional operators when chaining > > Hi, > > Just to clarify, you are recovering from a savepoint, not a retained > checkpoint, right? > And how are you setting the operator IDs? > You mentioned that with allowNonRestoredState set to false recovery fails. > Does it succeed with this flag set to true? > > Answering your questions: > Each operator state has the corresponding ID in the snapshot (though > technically the snapshot for the chain is sent as a single object to the > JM). > Probably some intermediate operators have state. How do you verify that > they don't? Exception message could probably help to identify the > problematic operators. > > Regards, > Roman > > > On Fri, Jul 2, 2021 at 7:52 PM Jiahui Jiang <qzhzm173...@hotmail.com> > wrote: > > Hello Flink, > > I'm trying to understand the state recovery mechanism when there are extra > stateless operators. > > I'm using flink-sql, and I tested a 'select `number_col` from source' > query, where the stream graph looks like: > > `source (stateful with fixed uid) -> [several stateless operators > translated by Flink] -> sink (stateful with fixed uid)` > > I have enabled chaining, so these operators are all chaining into one task > vertex. > > > According to Flink's docs, I should be able to start a new job with > different job graph as long as all the previous stateful operators can > still be found in the graph. > But when I tested recovery from the previous state with a new query > 'select `1` from source'. > > The generated stream graph had one extra stateless operator, but failed to > recover when there allowNonRestoredState was set to false. > > I'm wondering how's Flink storing operator state when chaining is enabled? > Does it (1) store each operator state separately (source and sink has its > own entry in the checkpoint state) or (2) store the state for all the > operators chained into the same subtask (source, sink, all the SQL > transformation operators) all under the same operator ID? > > In this experiement I have fixed the source and sink's uids, why does that > seem to not have an effect on forcing the stateful operators to recover > from its own state? > > Thank you! > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/savepoints/#savepoint-state > Savepoints | Apache Flink > <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/savepoints/#savepoint-state> > Savepoints # What is a Savepoint? How is a Savepoint different from a > Checkpoint? # A Savepoint is a consistent image of the execution state of a > streaming job, created via Flinkās checkpointing mechanism. You can use > Savepoints to stop-and-resume, fork, or update your Flink jobs. Savepoints > consist of two parts: a directory with (typically large) binary files on > stable storage (e.g. HDFS ... > ci.apache.org > >