[ 
https://issues.apache.org/jira/browse/FLINK-36192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878679#comment-17878679
 ] 

yuanfenghu commented on FLINK-36192:
------------------------------------

> To fix the issue, we may have to add an explicit metric for the number of 
> source partitions. I don't really see other options because we need both the 
> maximum parallelism and the number of source partitions to perform the key 
> alignment. We can't lower the maximum parallelism like we have previously 
> done when we want to support use cases where the number of partitions exceeds 
> the maximum parallelism, and we also need to ensure the maximum parallelism 
> is preserved to prevent exceeding it.

+1 , 
We need to add a partition indicator to the source to later determine the 
degree of parallelism (key alignment, partition alignment)
Among them, key alignment can continue to use the previous logic ,See  
[alignemnt logic| 
https://github.com/apache/flink-kubernetes-operator/blob/d8568ae28b13f5cc649a83f174dbc88449f0c602/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L299]
 
For partition alignment, they may not be the same as key alignment.

When the degree of parallelism calculated in the autoscaler logic P1 > 
partition/2, there will be two situations:

1. When the number of partitions is less than max paralleliem, use the number 
of partitions as the degree of parallelism (I think this is optimal)

2. When the number of partitions is greater than max paralleliem, we have three 
ways to select the final degree of parallelism. {color:#de350b}Select P1 as the 
final degree of parallelism{color}, {color:#4c9aff}select partition/2 as the 
final degree of parallelism (this ensures that the partitions are 
aligned),{color} {color:#ffbdad}and max paralleliem is the final degree of 
parallelism.{color}
I'm not sure which of these three methods is better. In my opinion, I'm more 
inclined to try the second one.({color:#4c9aff}partition/2 as the final degree 
of parallelism){color}

> Optimize the logic to make it the common divisor of the partition number of 
> the data source when determining the parallelism of the source task.
> ------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-36192
>                 URL: https://issues.apache.org/jira/browse/FLINK-36192
>             Project: Flink
>          Issue Type: Improvement
>          Components: Autoscaler
>            Reporter: yuanfenghu
>            Priority: Minor
>
> *Description:*
> We hope that when we know the number of partitions of Kafka data, we can try 
> our best to make the parallelism of tasks that consume Kafka equal to the 
> common divisor of the partitions, so that the tasks that are consumed can be 
> balanced.
>  
> {*}current logic{*}:
> Currently, the parallelism of tasks in the autoscaler is determined as 
> follows:
> step1: Calculate the processing rate of the task target and the corresponding 
> parallelism p1
> step2: Use the currently calculated degree of parallelism and the maximum 
> degree of parallelism of the operator to calculate, and take out the greatest 
> common divisor p2 of the maximum degree of parallelism / 2. If p2 < 
> maxparalleliem / 2, use p2 as the final degree of parallelism. If p2 > 
> maxparalleliem / 2 then use p1 as the final parallelism
> If the task that needs to be judged is a task that consumes Kafka or Pulsar, 
> the maximum parallelism of the task will be determined first: if the number 
> of partitions < the maximum parallelism of the current task, then the maximum 
> parallelism of the current task is the number of partitions of Kafka or 
> Pulsar. , otherwise the maximum degree of parallelism remains unchanged, so 
> there are the following situations:
> When the number of partitions in kafka or pulsar is less than the maximum 
> parallelism of the operator
> 1. If the parallelism calculated in step 1 <the number of kafka or pulsar 
> partitions/2, then the demand is met and the number of tasks can be balanced.
> 2. If the parallelism calculated in step 1 > the number of kafka or pulsar 
> partitions / 2, use the parallelism calculated in step 1. At this time, the 
> consumption will become unbalanced. For example, the number of partitions in 
> kafka is 64, and the expected parallelism calculated in step 1 is If the 
> degree is 48, the final task parallelism degree is 48
> When the number of partitions in kafka or pulsar is greater than the maximum 
> parallelism of the operator
> Calculate the parallelism completely according to the logic of step 1. For 
> example, the parallelism of one of my kafka partitions is 200, and the 
> maximum parallelism of the operator is 128. Then the calculated parallelism 
> is 2, 4, 8, 16... It is very likely that Kafka cannot be consumed evenly
>  
> {*}expect logic{*}:
>  * When the number of partitions is less than the maximum parallelism, 
> determine the number of parallelism of the task as the common divisor of the 
> number of partitions.
>  * When the number of partitions is greater than the maximum parallelism, the 
> number of parallelism of the task is determined to be the common divisor of 
> the number of partitions but does not exceed the maximum parallelism.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to