Thanks Mason for starting this thread discussion, generally +1 for the motivation and proposal .
I have some questions about the detail after read the FLIP. 1. The FLIP says "However, pendingRecords is currently only reported by the SourceReader and doesn’t cover the case for sources that only the SourceEnumerator can calculate those metrics e.g. bounded split implementations like Iceberg." Could you explain more why we cannot calculate metrics in SourceReader side? 2.minor: Looks like you mixed SplitEnumeratorMetricGroup and SourceReaderMetricGroup in public interface part @PublicEvolving public interface SplitEnumeratorMetricGroup extends OperatorCoordinatorMetricGroup { // IIUC, these methods belongs to SourceReader? Counter getNumRecordsInErrorsCounter(); void setPendingBytesGauge(Gauge<Long> pendingBytesGauge); void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge); /** new addition */ void setAssignedSplitsGauge(Gauge<Long> assignedSplitsGauge); } 3. Could you explain the relation between your proposed PendingRecordsGauge in SplitEnumerator and PendingRecordsGauge in SourceReader? e.g. Which kind of connector developers needs to care/implements the two metrics. 4. The discussion thread context show me that you want to introduce setAssignedSplits method in sourceReader side, but the FLIP didn’t update yet like the implementation part? And, the final status looks strange to me that we calculate total assignedSplits in splitEnumerator but calculate total unAssignedSplits in SourceReader side following your design. The action ‘assign’ is happening in SplitEnumerator sider and should be surely managed by splitEnumerator, why a SourceReader never assigns any splits need to report its assigned splits? Best, Leonard > 2023年11月17日 上午6:52,Mason Chen <mas.chen6...@gmail.com> 写道: > > Hi all, > > I would like to start a discussion on FLIP-394: Add Metrics for Connector > Agnostic Autoscaling [1]. > > This FLIP recommends adding two metrics to make autoscaling work for > bounded split source implementations like IcebergSource. These metrics are > required by the Flink Kubernetes Operator autoscaler algorithm [2] to > retrieve information for the backlog and the maximum source parallelism. > The changes would affect the `@PublicEvolving` `SplitEnumeratorMetricGroup` > API of the source connector framework. > > Best, > Mason > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-394%3A+Add+Metrics+for+Connector+Agnostic+Autoscaling > [2] > https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/#limitations