StephanEwen opened a new pull request #14256: URL: https://github.com/apache/flink/pull/14256
## This is based upon PR #14255, which represent the first of the three commits here. # What is the purpose of the change This extends the `OperatorCoordinator` with a `subtaskReset()` method. - The `subtaskReset()` method is the notification of a local failover (pipelined region) where the `OperatorCoordinator` is not reset as a whole, but only resets the subtask-dependent state. - That method complements the method `resetToCheckpoint()` which is called on a global-failover and resets the entire state of the `OperatorCoordinator`. The behavior is transparent across batch and for streaming execution: - All notifications come on all global- and local failover, both in batch and streaming - For streaming execution, the methods are invoked with the ID of the restored checkpoint. - For streaming execution, if there is not yet any completed checkpoint, then the methods are invoked with a checkpoint ID of _-1_. That represents a "virtual empty checkpoint" that marks the "beginning of the execution". _-1_ is before any other checkpoint ID, so it fits transparently into the contract of monotonous checkpoint IDs and "later-checkpoints-subsume-earlier-checkpoints". - Batch recovery behaves exactly like a streaming recovery before the first checkpoint: There is only ever the virtual empty checkpoint that represents the beginning of the execution, no additional checkpoint is taken. All recoveries refer to that empty checkpoint with no state. That way batch is strictly a special case of streaming. This logic will allow us to make the consistency of the `SplitEnumerator` for sources easier (future PR). Instead of reacting to failure notifications, we react to restore notifications. - Restore notifications are well ordered with checkpoints, we eliminate the races of [FLINK-20290](https://issues.apache.org/jira/browse/FLINK-20290) - Restore notifications generalize well across local and global failovers. - All restores are relative to checkpoints (including the virtual empty checkpoint at the beginning). That makes split assignment tracking transparent in the sources: When a task is restored, all splits that came after the restored checkpoint have to be re-assigned. In batch, the restored checkpoint is the initial empty checkpoint, so all splits every assigned to the task have to be re-assigned. ## Brief change log - Commit 9f760fa improves the `resetToCheckpoint()` method by passing in the ID of the restored checkpoint. - Commit cf30b0e adds the `subtaskReset()` method, the scheduler code to call that method, and the tests. ## Verifying this change This PR adds a series of test scenarios. The class `OperatorCoordinatorSchedulerTest` covers the full contract of the `OperatorCoordinator` calls and notifications from the scheduler. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **yes** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org