Hello Roman, Sorry I did some more testing and the original failure was caused by a different part of the pipeline. We I added a new stateless operator, it was able to restart from the previous savepoint with no issue.
Another question I have is, since you explicitly asked whether it's a savepoint or checkpoint, What are the behavior differences when recovering from a checkpoint vs. a savepoint? If the job graph changes between runs, but all the stateful operators are guaranteed to have their UID fixed. Will a pipeline be able to restore from the retained checkpoint if incremental checkpoint is disabled? Thank you! ________________________________ From: Roman Khachatryan <ro...@apache.org> Sent: Friday, July 2, 2021 4:59 PM To: Jiahui Jiang <qzhzm173...@hotmail.com> Cc: user@flink.apache.org <user@flink.apache.org> Subject: Re: Understanding recovering from savepoint / checkpoint with additional operators when chaining Hi, Just to clarify, you are recovering from a savepoint, not a retained checkpoint, right? And how are you setting the operator IDs? You mentioned that with allowNonRestoredState set to false recovery fails. Does it succeed with this flag set to true? Answering your questions: Each operator state has the corresponding ID in the snapshot (though technically the snapshot for the chain is sent as a single object to the JM). Probably some intermediate operators have state. How do you verify that they don't? Exception message could probably help to identify the problematic operators. Regards, Roman On Fri, Jul 2, 2021 at 7:52 PM Jiahui Jiang <qzhzm173...@hotmail.com<mailto:qzhzm173...@hotmail.com>> wrote: Hello Flink, I'm trying to understand the state recovery mechanism when there are extra stateless operators. I'm using flink-sql, and I tested a 'select `number_col` from source' query, where the stream graph looks like: `source (stateful with fixed uid) -> [several stateless operators translated by Flink] -> sink (stateful with fixed uid)` I have enabled chaining, so these operators are all chaining into one task vertex. According to Flink's docs, I should be able to start a new job with different job graph as long as all the previous stateful operators can still be found in the graph. But when I tested recovery from the previous state with a new query 'select `1` from source'. The generated stream graph had one extra stateless operator, but failed to recover when there allowNonRestoredState was set to false. I'm wondering how's Flink storing operator state when chaining is enabled? Does it (1) store each operator state separately (source and sink has its own entry in the checkpoint state) or (2) store the state for all the operators chained into the same subtask (source, sink, all the SQL transformation operators) all under the same operator ID? In this experiement I have fixed the source and sink's uids, why does that seem to not have an effect on forcing the stateful operators to recover from its own state? Thank you! https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/savepoints/#savepoint-state Savepoints | Apache Flink<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/savepoints/#savepoint-state> Savepoints # What is a Savepoint? How is a Savepoint different from a Checkpoint? # A Savepoint is a consistent image of the execution state of a streaming job, created via Flinkās checkpointing mechanism. You can use Savepoints to stop-and-resume, fork, or update your Flink jobs. Savepoints consist of two parts: a directory with (typically large) binary files on stable storage (e.g. HDFS ... ci.apache.org<http://ci.apache.org>