becketqin opened a new pull request #12510:
URL: https://github.com/apache/flink/pull/12510


   ## What is the purpose of the change
   Currently the `SourceCoordinator` assumes that `resetToCheckpoint()` method 
is invoked before the `start()` method is invoked. This is because it is 
difficult to isolate the behavior of the enumerator before and after the reset. 
In general, it is safer to re-create a new instance of the 
`OperatorCoordinator` instead of restart it on the fly.
   
   This patch implements the above re-creation logic for the SourceCoordinator. 
It has two commits. 
   
   The first commit add a new method `cancelAsyncCalls()` to the 
`SplitEnumeratorContext`. It helps the enumerator implementations to cleanly 
close itself. For example, a Kafka enumerator may have a periodic partition 
discovery async call running, which uses a consumer instance. When closing the 
enumerator, one needs to first stop all the async calls before closing the 
consumer instance. `cancelAsyncCalls()` helps users do that. Otherwise each 
enumerator has to implement their own logic.
   
   The second patch introduces a `RecreateOnResetOperatorCoordinator` class. 
The purpose of this class is to make sure the checkpoints are always reset to a 
newly created operator coordinator instance. This is a wrapper class for an 
`OperatorCoordinator`. It delegates all the methods to the contained 
`OperatorCoordinator` except `resetToCheckpoint()`. When `resetToCheckpoint()` 
is called, it first closes the containing `OperatorCoordinator`, create a new 
instance and reset the newly created `OperatorCoordinator` instance to the 
given checkpoint.
   
   ## Brief change log
   75c6ec4c8432735152f1a597aac807faba412ed3 adds a `cancelAsyncCalls()` method 
to the `SplitEnumeratorContext`.
   c85823ceb5fbf877398faf98764874d8fcc6d1e2 introduce a new class of 
`RecreateOnResetOperatorCoordinator` to create a new operator coordinator 
instance upon checkpoint reset.
   
   ## Verifying this change
   New unit tests are added to verify the change:
   ExecutionNotifierTest
   SourceCoordinatorProviderTest
   RecreateOnResetOperatorCoordinatorTest
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to