StephanEwen opened a new pull request #14256:
URL: https://github.com/apache/flink/pull/14256


   ## This is based upon PR #14255, which represent the first of the three 
commits here.
   
   # What is the purpose of the change
   
   This extends the `OperatorCoordinator` with a `subtaskReset()` method.
     - The `subtaskReset()` method is the notification of a local failover 
(pipelined region) where the `OperatorCoordinator` is not reset as a whole, but 
only resets the subtask-dependent state.
     - That method complements the method `resetToCheckpoint()` which is called 
on a global-failover and resets the entire state of the `OperatorCoordinator`.
   
   The behavior is transparent across batch and for streaming execution:
     - All notifications come on all global- and local failover, both in batch 
and streaming 
     - For streaming execution, the methods are invoked with the ID of the 
restored checkpoint.
     - For streaming execution, if there is not yet any completed checkpoint, 
then the methods are invoked with a checkpoint ID of _-1_. That represents a 
"virtual empty checkpoint" that marks the "beginning of the execution". _-1_ is 
before any other checkpoint ID, so it fits transparently into the contract of 
monotonous checkpoint IDs and "later-checkpoints-subsume-earlier-checkpoints".
     - Batch recovery behaves exactly like a streaming recovery before the 
first checkpoint: There is only ever the virtual empty checkpoint that 
represents the beginning of the execution, no additional checkpoint is taken. 
All recoveries refer to that empty checkpoint with no state. That way batch is 
strictly a special case of streaming.
   
   This logic will allow us to make the consistency of the `SplitEnumerator` 
for sources easier (future PR).
   Instead of reacting to failure notifications, we react to restore 
notifications.
     - Restore notifications are well ordered with checkpoints, we eliminate 
the races of [FLINK-20290](https://issues.apache.org/jira/browse/FLINK-20290)
     - Restore notifications generalize well across local and global failovers.
     - All restores are relative to checkpoints (including the virtual empty 
checkpoint at the beginning). That makes split assignment tracking transparent 
in the sources: When a task is restored, all splits that came after the 
restored checkpoint have to be re-assigned. In batch, the restored checkpoint 
is the initial empty checkpoint, so all splits every assigned to the task have 
to be re-assigned.
   
   ## Brief change log
   
     - Commit 9f760fa improves the `resetToCheckpoint()` method by passing in 
the ID of the restored checkpoint.
     - Commit cf30b0e adds the `subtaskReset()` method, the scheduler code to 
call that method, and the tests.
   
   ## Verifying this change
   
   This PR adds a series of test scenarios. The class 
`OperatorCoordinatorSchedulerTest` covers the full contract of the 
`OperatorCoordinator` calls and notifications from the scheduler.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
     - The serializers: **no**
     - The runtime per-record code paths (performance sensitive): **no**
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **yes**
     - The S3 file system connector: **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **no**
     - If yes, how is the feature documented? **not applicable**
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to