[ 
https://issues.apache.org/jira/browse/FLINK-35321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17946678#comment-17946678
 ] 

Nick M edited comment on FLINK-35321 at 4/23/25 10:24 AM:
----------------------------------------------------------

same here, 1.20.1

But now it coming from CommittableCollector constructor with code:
{code:java}
 this.metricGroup.setCurrentPendingCommittablesGauge(this::getNumPending); 
{code}
and constructor call come from copying snapshot
{code:java}
    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        // It is important to copy the collector to not mutate the state.
        
committableCollectorState.update(Collections.singletonList(committableCollector.copy()));
    } {code}
where copy() contain new CommittableCollector:
{code:java}
    public CommittableCollector<CommT> copy() {
        return new CommittableCollector<>(
                checkpointCommittables.entrySet().stream()
                        .map(e -> Tuple2.of(e.getKey(), e.getValue().copy()))
                        .collect(Collectors.toMap((t) -> t.f0, (t) -> t.f1)),
                metricGroup);
    }
 {code}
 


was (Author: JIRAUSER297336):
same here, 1.20.1

But now it coming from CommittableCollector constructor with code:
{code:java}
 this.metricGroup.setCurrentPendingCommittablesGauge(this::getNumPending); 
{code}

> CheckpointCommittableManagerImpl re-registers pendingCommittables gauge on 
> every commit operation
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-35321
>                 URL: https://issues.apache.org/jira/browse/FLINK-35321
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core, Runtime / Checkpointing, Runtime / Metrics
>    Affects Versions: 1.19.0
>            Reporter: Lennon Yu
>            Priority: Major
>
> Found while testing a home-made Sink implementation that implements 
> SupportCommitter. We observed that starting from the *second* checkpoint, 
> every committer commit will be accompanied by the warning log:
> {quote}
> Name collision: Group already contains a Metric with the name 
> 'pendingCommittables'.
> {quote}
> Enabling the debugger and tracing the origin of this log took us to 
> {{org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl}}
>  at line 137 of the commit() method:
> {code:java}
> metricGroup.setCurrentPendingCommittablesGauge(() -> 
> getPendingRequests(false).size());
> {code}
> It looks like that instead of modifying the value of the gauge, the manager 
> class is *re-setting with a different guage* on every commit operation, which 
> explains the appearance of the warning log shown above.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to