tzulitai commented on code in PR #23555:
URL: https://github.com/apache/flink/pull/23555#discussion_r1369177945


##########
flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java:
##########
@@ -77,4 +97,42 @@ interface PrecommittingSinkWriter<InputT, CommT> extends 
SinkWriter<InputT> {
          */
         Collection<CommT> prepareCommit() throws IOException, 
InterruptedException;
     }
+
+    /** The interface exposes some runtime info for creating a {@link 
Committer}. */
+    @PublicEvolving
+    interface CommitterInitContext {

Review Comment:
   I'm wondering if it makes sense to have this and the existing sink writer's 
`InitContext` extend from a common interface, as it seems all methods except 
from the metric group retrieval is shared. Having that would make sure that 
these shared methods don't diverge across the `InitContext`s in the future, 
which can be confusing for implementors given how tightly coupled the committer 
and sink writer is.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommitRequestImpl.java:
##########
@@ -32,16 +33,27 @@ public class CommitRequestImpl<CommT> implements 
Committer.CommitRequest<CommT>
     private CommT committable;
     private int numRetries;
     private CommitRequestState state;
+    private SinkCommitterMetricGroup metricGroup;
 
-    protected CommitRequestImpl(CommT committable) {
+    protected CommitRequestImpl(CommT committable, SinkCommitterMetricGroup 
metricGroup) {
         this.committable = committable;
+        this.metricGroup = metricGroup;
         state = CommitRequestState.RECEIVED;
+
+        // Currently only the SubtaskCommittableManager uses this constructor 
to create a new
+        // CommitRequestImpl, so we can increment the metrics here
+        metricGroup.getNumCommittablesTotalCounter().inc();

Review Comment:
   this feels a bit hacky, as your comment already hints. My main issue with 
this is that this happening in the constructor, if we only look at this class 
locally it's hard to tell if we're incrementing it correctly. For example, a 
deserializer for `CommitRequestImpl`s can totally call this constructor and 
unintentionally increment the metric.
   
   I see that you've already wired in the `SinkCommitterMetricGroup` to the 
`SubtaskCommittableManager`. Can we just increment the total # of committables 
there in the `add` method then?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java:
##########
@@ -72,7 +86,7 @@ void add(CommittableWithLineage<CommT> committable) {
 
     void add(CommT committable) {
         checkState(requests.size() < numExpectedCommittables, "Already 
received all committables.");
-        requests.add(new CommitRequestImpl<>(committable));
+        requests.add(new CommitRequestImpl<>(committable, metricGroup));

Review Comment:
   Increment total # of committables here instead of within `CommitRequestImpl` 
constructor.



##########
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java:
##########
@@ -116,6 +125,58 @@ public void testMetrics() throws Exception {
         jobClient.getJobExecutionResult().get();
     }
 
+    @Test
+    public void testCommitterMetrics() throws Exception {

Review Comment:
   👍 



##########
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java:
##########
@@ -116,6 +125,58 @@ public void testMetrics() throws Exception {
         jobClient.getJobExecutionResult().get();
     }
 
+    @Test
+    public void testCommitterMetrics() throws Exception {

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to