[ https://issues.apache.org/jira/browse/FLINK-35321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17946678#comment-17946678 ]
Nick M edited comment on FLINK-35321 at 4/23/25 10:24 AM: ---------------------------------------------------------- same here, 1.20.1 But now it coming from CommittableCollector constructor with code: {code:java} this.metricGroup.setCurrentPendingCommittablesGauge(this::getNumPending); {code} and constructor call come from copying snapshot {code:java} @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); // It is important to copy the collector to not mutate the state. committableCollectorState.update(Collections.singletonList(committableCollector.copy())); } {code} where copy() contain new CommittableCollector: {code:java} public CommittableCollector<CommT> copy() { return new CommittableCollector<>( checkpointCommittables.entrySet().stream() .map(e -> Tuple2.of(e.getKey(), e.getValue().copy())) .collect(Collectors.toMap((t) -> t.f0, (t) -> t.f1)), metricGroup); } {code} was (Author: JIRAUSER297336): same here, 1.20.1 But now it coming from CommittableCollector constructor with code: {code:java} this.metricGroup.setCurrentPendingCommittablesGauge(this::getNumPending); {code} > CheckpointCommittableManagerImpl re-registers pendingCommittables gauge on > every commit operation > ------------------------------------------------------------------------------------------------- > > Key: FLINK-35321 > URL: https://issues.apache.org/jira/browse/FLINK-35321 > Project: Flink > Issue Type: Bug > Components: API / Core, Runtime / Checkpointing, Runtime / Metrics > Affects Versions: 1.19.0 > Reporter: Lennon Yu > Priority: Major > > Found while testing a home-made Sink implementation that implements > SupportCommitter. We observed that starting from the *second* checkpoint, > every committer commit will be accompanied by the warning log: > {quote} > Name collision: Group already contains a Metric with the name > 'pendingCommittables'. > {quote} > Enabling the debugger and tracing the origin of this log took us to > {{org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl}} > at line 137 of the commit() method: > {code:java} > metricGroup.setCurrentPendingCommittablesGauge(() -> > getPendingRequests(false).size()); > {code} > It looks like that instead of modifying the value of the gauge, the manager > class is *re-setting with a different guage* on every commit operation, which > explains the appearance of the warning log shown above. -- This message was sent by Atlassian Jira (v8.20.10#820010)