wenbingshen commented on code in PR #827: URL: https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1634118107
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ########## @@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) { json, slotSharingGroupIdMap, maxParallelismMap, metrics, finished); } - private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, JobTopology topology) - throws Exception { + private void updateKafkaPulsarSourceMaxParallelisms( + Context ctx, JobID jobId, JobTopology topology) throws Exception { try (var restClient = ctx.getRestClusterClient()) { - var partitionRegex = Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$"); + Pattern partitionRegex = + Pattern.compile( + "^.*\\.KafkaSourceReader\\.topic\\.(?<kafkaTopic>.+)\\.partition\\.(?<kafkaId>\\d+)\\.currentOffset$" + + "|^.*\\.PulsarConsumer\\.(?<pulsarTopic>.+)-partition-(?<pulsarId>\\d+)\\..*\\.numMsgsReceived$"); for (var vertexInfo : topology.getVertexInfos().values()) { if (vertexInfo.getInputs().isEmpty()) { var sourceVertex = vertexInfo.getId(); var numPartitions = queryAggregatedMetricNames(restClient, jobId, sourceVertex).stream() - .filter(partitionRegex.asMatchPredicate()) - .count(); + .map( + v -> { + Matcher matcher = partitionRegex.matcher(v); + if (matcher.matches()) { + String kafkaTopic = matcher.group("kafkaTopic"); + String kafkaId = matcher.group("kafkaId"); + String pulsarTopic = + matcher.group("pulsarTopic"); + String pulsarId = matcher.group("pulsarId"); + return kafkaTopic != null + ? kafkaTopic + "-" + kafkaId + : pulsarTopic + "-" + pulsarId; Review Comment: @gyfora @1996fanrui I think the existing matching logic is not too complicated, and the PR is not too large. If the maximum parallelism of other source components is expanded in the future, it will not be too late to split it; If we want to split the matching logic of Kafka and Pulsar, we may need to perform two regular matching on the source metrics data set to obtain the number of partitions of these two queues separately, which does not seem to be necessary; It seems that there is no simple or matching. The current regular expression is or, and it is also distinguished in the map function. Maybe you have a better answer to tell me, thank you. I suggest keeping the logic of the existing PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org