gaoyunhaii opened a new pull request #15055: URL: https://github.com/apache/flink/pull/15055
## What is the purpose of the change This PR makes `CheckpointBarrierHandler` support alignment after some input channels have received EndOfPartition. Based on the discussion under https://github.com/apache/flink/pull/14831, to support different types of checkpoint (e.g., alignment checkpoints and unaligned checkpoints), we would hold the upstream tasks until all the records are processed by the downstream tasks and then emit EndOfPartitionEvent. The `CheckpointBarrierHandler` would mark the input channel as aligned for all the pending checkpoints and won't consider this input channel for the following checkpoints. It also need to note that the current implementation does not need to modify the channel persistence logic since now the input channel only consider whether to persist a single Buffer, and when to start persisting and when to stop persisting is decided by `CheckpointBarrierHandler`. ## Brief change log - b64a8a6a30fb96b6628dc09330e9a544f34e3569 introduces a new `EndOfUserRecordsEvent`, which is sent after all the user records. - 98076138b50f59402f4d7c56824d1b42c08fcfa9 modifies ResultPartition to support waiting for the downstream tasks to process all the pending records. Only PipelinedResultPartition actually implement the functionality and other types of ResultPartition would return immediately. - ab2ef45b003b2d26c9cbaf07d3327933ed0540dc StreamTask would wait for the downstream tasks to process all the pending records if checkpoint is enabled. - 819512aa61f5e9143754fde9b6d97394d85bd2a1 Downstream tasks would acknowledge the `EndOfUserRecordsEvent` to notify the upstream task. - 06c014d726be4bfd79cee9219ca1918ace6916ef refactors the checkpoint barrier handler for the future modification. - 4b4e8095064900352983e53959c4012faa26d621 modifies checkpoint barrier handler to support alignment with EndOfPartition received. - a70ca2dd1da50eeee0b25bad9828c7bc0f809e92 Allows taking snapshot with closed operators if enabled checkpoints after tasks finished. - 611db33cac0513d5e5ba44a66c6bf217aa0e68a2 adds tests for triggering checkpoint for non-source tasks. ## Verifying this change This change added tests and can be verified via added unit tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **no, the functionality would be enabled in the following PR.** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org