becketqin opened a new pull request #12510: URL: https://github.com/apache/flink/pull/12510
## What is the purpose of the change Currently the `SourceCoordinator` assumes that `resetToCheckpoint()` method is invoked before the `start()` method is invoked. This is because it is difficult to isolate the behavior of the enumerator before and after the reset. In general, it is safer to re-create a new instance of the `OperatorCoordinator` instead of restart it on the fly. This patch implements the above re-creation logic for the SourceCoordinator. It has two commits. The first commit add a new method `cancelAsyncCalls()` to the `SplitEnumeratorContext`. It helps the enumerator implementations to cleanly close itself. For example, a Kafka enumerator may have a periodic partition discovery async call running, which uses a consumer instance. When closing the enumerator, one needs to first stop all the async calls before closing the consumer instance. `cancelAsyncCalls()` helps users do that. Otherwise each enumerator has to implement their own logic. The second patch introduces a `RecreateOnResetOperatorCoordinator` class. The purpose of this class is to make sure the checkpoints are always reset to a newly created operator coordinator instance. This is a wrapper class for an `OperatorCoordinator`. It delegates all the methods to the contained `OperatorCoordinator` except `resetToCheckpoint()`. When `resetToCheckpoint()` is called, it first closes the containing `OperatorCoordinator`, create a new instance and reset the newly created `OperatorCoordinator` instance to the given checkpoint. ## Brief change log 75c6ec4c8432735152f1a597aac807faba412ed3 adds a `cancelAsyncCalls()` method to the `SplitEnumeratorContext`. c85823ceb5fbf877398faf98764874d8fcc6d1e2 introduce a new class of `RecreateOnResetOperatorCoordinator` to create a new operator coordinator instance upon checkpoint reset. ## Verifying this change New unit tests are added to verify the change: ExecutionNotifierTest SourceCoordinatorProviderTest RecreateOnResetOperatorCoordinatorTest ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org