Thanks Mason for starting this thread discussion, generally +1 for the 
motivation and proposal . 


I have some questions about the detail after read the FLIP.

1. The FLIP says "However, pendingRecords is currently only reported by the 
SourceReader and doesn’t cover the case for sources that only the 
SourceEnumerator can calculate those metrics e.g. bounded split implementations 
like Iceberg." Could you explain more why we cannot calculate metrics in  
SourceReader side? 

2.minor: Looks like you mixed SplitEnumeratorMetricGroup and 
SourceReaderMetricGroup in public interface part
@PublicEvolving
public interface SplitEnumeratorMetricGroup extends 
OperatorCoordinatorMetricGroup {
    // IIUC, these methods belongs to SourceReader?
    Counter getNumRecordsInErrorsCounter();
    void setPendingBytesGauge(Gauge<Long> pendingBytesGauge);
    void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge);
     
    /** new addition */
    void setAssignedSplitsGauge(Gauge<Long> assignedSplitsGauge);
}

3. Could you explain the relation between your proposed PendingRecordsGauge in 
SplitEnumerator and PendingRecordsGauge in SourceReader? e.g. Which kind of 
connector developers needs to care/implements the two 
metrics.

4. The discussion thread context show me that you want to introduce 
setAssignedSplits method in sourceReader side, but the FLIP didn’t update yet 
like the implementation part? And, the final status looks strange to me that we 
calculate total assignedSplits in 
splitEnumerator but calculate total unAssignedSplits in SourceReader side 
following your design. The action ‘assign’ is happening in SplitEnumerator 
sider and should be surely managed by splitEnumerator, 
why a SourceReader never assigns any splits need to report its assigned splits?

Best,
Leonard



> 2023年11月17日 上午6:52,Mason Chen <mas.chen6...@gmail.com> 写道:
> 
> Hi all,
> 
> I would like to start a discussion on FLIP-394: Add Metrics for Connector
> Agnostic Autoscaling [1].
> 
> This FLIP recommends adding two metrics to make autoscaling work for
> bounded split source implementations like IcebergSource. These metrics are
> required by the Flink Kubernetes Operator autoscaler algorithm [2] to
> retrieve information for the backlog and the maximum source parallelism.
> The changes would affect the `@PublicEvolving` `SplitEnumeratorMetricGroup`
> API of the source connector framework.
> 
> Best,
> Mason
> 
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-394%3A+Add+Metrics+for+Connector+Agnostic+Autoscaling
> [2]
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/#limitations

Reply via email to