pvary commented on code in PR #23555:
URL: https://github.com/apache/flink/pull/23555#discussion_r1370193137


##########
flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java:
##########
@@ -77,4 +97,42 @@ interface PrecommittingSinkWriter<InputT, CommT> extends 
SinkWriter<InputT> {
          */
         Collection<CommT> prepareCommit() throws IOException, 
InterruptedException;
     }
+
+    /** The interface exposes some runtime info for creating a {@link 
Committer}. */
+    @PublicEvolving
+    interface CommitterInitContext {

Review Comment:
   Created a common `InitContext`. See: 
https://github.com/apache/flink/pull/23555/files#diff-d80129086eb15764c75da260ebb116757f01485854277f399da30af99713eb57
   
   This `InitContext` serves as a base for `SinkV2.InitContext`, and the 
`TwoPhaseCommittingSink.CommitterInitContext`.
   
   I am undecided on the naming of these classes/interfaces:
   - Option 1: Stick to the current API specification. See: the PR.
   - Option 2: Rename the old `SinkV2.InitContext` to `SinkV2.WriterInitContext`
       - It is easier to understand/use in the long run
       - Breaks the user code on 1.18->1.19 migration
   
   I would vote for Option2 so we are better off on the long urun, but not sure 
how flexible can we be with the API.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommitRequestImpl.java:
##########
@@ -32,16 +33,27 @@ public class CommitRequestImpl<CommT> implements 
Committer.CommitRequest<CommT>
     private CommT committable;
     private int numRetries;
     private CommitRequestState state;
+    private SinkCommitterMetricGroup metricGroup;
 
-    protected CommitRequestImpl(CommT committable) {
+    protected CommitRequestImpl(CommT committable, SinkCommitterMetricGroup 
metricGroup) {
         this.committable = committable;
+        this.metricGroup = metricGroup;
         state = CommitRequestState.RECEIVED;
+
+        // Currently only the SubtaskCommittableManager uses this constructor 
to create a new
+        // CommitRequestImpl, so we can increment the metrics here
+        metricGroup.getNumCommittablesTotalCounter().inc();

Review Comment:
   Moved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to