wenbingshen commented on code in PR #827:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1634118107


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##########
@@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo 
jobDetailsInfo) {
                 json, slotSharingGroupIdMap, maxParallelismMap, metrics, 
finished);
     }
 
-    private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, 
JobTopology topology)
-            throws Exception {
+    private void updateKafkaPulsarSourceMaxParallelisms(
+            Context ctx, JobID jobId, JobTopology topology) throws Exception {
         try (var restClient = ctx.getRestClusterClient()) {
-            var partitionRegex = 
Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$");
+            Pattern partitionRegex =
+                    Pattern.compile(
+                            
"^.*\\.KafkaSourceReader\\.topic\\.(?<kafkaTopic>.+)\\.partition\\.(?<kafkaId>\\d+)\\.currentOffset$"
+                                    + 
"|^.*\\.PulsarConsumer\\.(?<pulsarTopic>.+)-partition-(?<pulsarId>\\d+)\\..*\\.numMsgsReceived$");
             for (var vertexInfo : topology.getVertexInfos().values()) {
                 if (vertexInfo.getInputs().isEmpty()) {
                     var sourceVertex = vertexInfo.getId();
                     var numPartitions =
                             queryAggregatedMetricNames(restClient, jobId, 
sourceVertex).stream()
-                                    .filter(partitionRegex.asMatchPredicate())
-                                    .count();
+                                    .map(
+                                            v -> {
+                                                Matcher matcher = 
partitionRegex.matcher(v);
+                                                if (matcher.matches()) {
+                                                    String kafkaTopic = 
matcher.group("kafkaTopic");
+                                                    String kafkaId = 
matcher.group("kafkaId");
+                                                    String pulsarTopic =
+                                                            
matcher.group("pulsarTopic");
+                                                    String pulsarId = 
matcher.group("pulsarId");
+                                                    return kafkaTopic != null
+                                                            ? kafkaTopic + "-" 
+ kafkaId
+                                                            : pulsarTopic + 
"-" + pulsarId;

Review Comment:
   @gyfora @1996fanrui 
   I think the existing matching logic is not too complicated, and the PR is 
not too large. If the maximum parallelism of other source components is 
expanded in the future, it will not be too late to split it;
   
   If we want to split the matching logic of Kafka and Pulsar, we may need to 
perform two regular matching on the source metrics data set to obtain the 
number of partitions of these two queues separately, which does not seem to be 
necessary;
   
   It seems that there is no simple or matching. The current regular expression 
is or, and it is also distinguished in the map function. Maybe you have a 
better answer to tell me, thank you.
   
   I suggest keeping the logic of the existing PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to