tzulitai commented on code in PR #23555: URL: https://github.com/apache/flink/pull/23555#discussion_r1369177945
########## flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java: ########## @@ -77,4 +97,42 @@ interface PrecommittingSinkWriter<InputT, CommT> extends SinkWriter<InputT> { */ Collection<CommT> prepareCommit() throws IOException, InterruptedException; } + + /** The interface exposes some runtime info for creating a {@link Committer}. */ + @PublicEvolving + interface CommitterInitContext { Review Comment: I'm wondering if it makes sense to have this and the existing sink writer's `InitContext` extend from a common interface, as it seems all methods except from the metric group retrieval is shared. Having that would make sure that these shared methods don't diverge across the `InitContext`s in the future, which can be confusing for implementors given how tightly coupled the committer and sink writer is. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommitRequestImpl.java: ########## @@ -32,16 +33,27 @@ public class CommitRequestImpl<CommT> implements Committer.CommitRequest<CommT> private CommT committable; private int numRetries; private CommitRequestState state; + private SinkCommitterMetricGroup metricGroup; - protected CommitRequestImpl(CommT committable) { + protected CommitRequestImpl(CommT committable, SinkCommitterMetricGroup metricGroup) { this.committable = committable; + this.metricGroup = metricGroup; state = CommitRequestState.RECEIVED; + + // Currently only the SubtaskCommittableManager uses this constructor to create a new + // CommitRequestImpl, so we can increment the metrics here + metricGroup.getNumCommittablesTotalCounter().inc(); Review Comment: this feels a bit hacky, as your comment already hints. My main issue with this is that this happening in the constructor, if we only look at this class locally it's hard to tell if we're incrementing it correctly. For example, a deserializer for `CommitRequestImpl`s can totally call this constructor and unintentionally increment the metric. I see that you've already wired in the `SinkCommitterMetricGroup` to the `SubtaskCommittableManager`. Can we just increment the total # of committables there in the `add` method then? ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java: ########## @@ -72,7 +86,7 @@ void add(CommittableWithLineage<CommT> committable) { void add(CommT committable) { checkState(requests.size() < numExpectedCommittables, "Already received all committables."); - requests.add(new CommitRequestImpl<>(committable)); + requests.add(new CommitRequestImpl<>(committable, metricGroup)); Review Comment: Increment total # of committables here instead of within `CommitRequestImpl` constructor. ########## flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java: ########## @@ -116,6 +125,58 @@ public void testMetrics() throws Exception { jobClient.getJobExecutionResult().get(); } + @Test + public void testCommitterMetrics() throws Exception { Review Comment: 👍 ########## flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java: ########## @@ -116,6 +125,58 @@ public void testMetrics() throws Exception { jobClient.getJobExecutionResult().get(); } + @Test + public void testCommitterMetrics() throws Exception { Review Comment: 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org