pvary opened a new pull request, #23555: URL: https://github.com/apache/flink/pull/23555
## What is the purpose of the change This is the implementation of main part of [FLIP-371: Provide initialization context for Committer creation in TwoPhaseCommittingSink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-371%3A+Provide+initialization+context+for+Committer+creation+in+TwoPhaseCommittingSink). Modifies the `TwoPhaseCommittingSink.Committer` creation by providing a `CommitterInitContext` for the `createCommitter` method. The change is build on the top of #23541 so please review those there, in only check the 2nd commit here. ## Brief change log API changes: - `TwoPhaseCommittingSink` - new `createCommitter `method with default implementation and deprecation - `CommitterInitContext` - new object for committer initialization and metrics - `SinkCommitterMetricGroup` - new metrics group for committer related metrics Implementation: - `InternalSinkCommitterMetricGroup` - implementation for `SinkCommitterMetricGroup` - `CommitterOperator` - to create the `CommitterInitContext` and `SinkCommitterMetricGroup`. And modify the initialization to use the new `Committer` creation method, and create the metrics group - Updating the metrics in the `CommitRequestImpl` and `CheckpointCommittableManagerImpl` - Propagate the new metricGroup object to finally arrive to the `CommitRequestImpl` where we need it (`CommiterOperator` -> `CommittableCollector` -> `CheckpointCommittableManagerImpl` -> `SubtaskCommittableManager` -> `SubtaskCommittableManager` -> `CommitRequestImpl`) - This part of the change is somewhat hard to follow, and I open for suggestions to make it better - SinkAdapterV1 changes to use the new Committer creation method, and metrics group (`SinkAdapterV1`, `GlobalCommitterSerializer`, `GlobalCommitterOperator`) - Updating the tests to propagate the metric group - Adding a new test case to `SinkV2MetricsITCase` to make sure that the metrics are propagated as expected. ## Verifying this change Added a new unit test, and modified the old ones to use the new method. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? not documented - might be worth to check the documentation of SinkV2 if it is exists, to update it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org