pvary commented on code in PR #23555: URL: https://github.com/apache/flink/pull/23555#discussion_r1370193137
########## flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java: ########## @@ -77,4 +97,42 @@ interface PrecommittingSinkWriter<InputT, CommT> extends SinkWriter<InputT> { */ Collection<CommT> prepareCommit() throws IOException, InterruptedException; } + + /** The interface exposes some runtime info for creating a {@link Committer}. */ + @PublicEvolving + interface CommitterInitContext { Review Comment: Created a common `InitContext`. See: https://github.com/apache/flink/pull/23555/files#diff-d80129086eb15764c75da260ebb116757f01485854277f399da30af99713eb57 This `InitContext` serves as a base for `SinkV2.InitContext`, and the `TwoPhaseCommittingSink.CommitterInitContext`. I am undecided on the naming of these classes/interfaces: - Option 1: Stick to the current API specification. See: the PR. - Option 2: Rename the old `SinkV2.InitContext` to `SinkV2.WriterInitContext` - It is easier to understand/use in the long run - Breaks the user code on 1.18->1.19 migration I would vote for Option2 so we are better off on the long urun, but not sure how flexible can we be with the API. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommitRequestImpl.java: ########## @@ -32,16 +33,27 @@ public class CommitRequestImpl<CommT> implements Committer.CommitRequest<CommT> private CommT committable; private int numRetries; private CommitRequestState state; + private SinkCommitterMetricGroup metricGroup; - protected CommitRequestImpl(CommT committable) { + protected CommitRequestImpl(CommT committable, SinkCommitterMetricGroup metricGroup) { this.committable = committable; + this.metricGroup = metricGroup; state = CommitRequestState.RECEIVED; + + // Currently only the SubtaskCommittableManager uses this constructor to create a new + // CommitRequestImpl, so we can increment the metrics here + metricGroup.getNumCommittablesTotalCounter().inc(); Review Comment: Moved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org