[jira] [Created] (FLINK-35823) Introduce parameters to control the upper limit of rescale to avoid unlimited shrinkage due to server-side bottlenecks or data skew.

2024-07-11 Thread yuanfenghu (Jira)
yuanfenghu created FLINK-35823:
--

 Summary: Introduce parameters to control the upper limit of 
rescale to avoid unlimited shrinkage due to server-side bottlenecks or data 
skew.
 Key: FLINK-35823
 URL: https://issues.apache.org/jira/browse/FLINK-35823
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler
Reporter: yuanfenghu
 Fix For: 2.0.0


1. If a Flink application writes data to other external storage systems, such 
as HDFS, Kafka, etc., when the external server becomes the bottleneck of the 
entire task, such as the throughput of HDFS decreases, the writing IO time will 
increase, and the corresponding Flink The metric busy will also increase. At 
this time, the autoscaler will determine that the parallelism needs to be 
increased to increase the write rate. However, in the above case, due to the 
bottleneck of the external server, this will not work. This will cause the next 
determination cycle to continue to increase the parallelism until parallelism = 
max-parallelism.

2. If some tasks have data skew, it will also cause the same problem.

 
Therefore, we should introduce a new parameter judgment. If the degree of 
parallelism continues to increase, the throughput will basically remain the 
same. There is no need to expand  anymore.
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35823) Introduce parameters to control the upper limit of rescale to avoid unlimited 扩容 due to server-side bottlenecks or data skew.

2024-07-11 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-35823:
---
Summary: Introduce parameters to control the upper limit of rescale to 
avoid unlimited 扩容 due to server-side bottlenecks or data skew.  (was: 
Introduce parameters to control the upper limit of rescale to avoid unlimited 
shrinkage due to server-side bottlenecks or data skew.)

> Introduce parameters to control the upper limit of rescale to avoid unlimited 
> 扩容 due to server-side bottlenecks or data skew.
> -
>
> Key: FLINK-35823
> URL: https://issues.apache.org/jira/browse/FLINK-35823
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yuanfenghu
>Priority: Major
> Fix For: 2.0.0
>
>
> 1. If a Flink application writes data to other external storage systems, such 
> as HDFS, Kafka, etc., when the external server becomes the bottleneck of the 
> entire task, such as the throughput of HDFS decreases, the writing IO time 
> will increase, and the corresponding Flink The metric busy will also 
> increase. At this time, the autoscaler will determine that the parallelism 
> needs to be increased to increase the write rate. However, in the above case, 
> due to the bottleneck of the external server, this will not work. This will 
> cause the next determination cycle to continue to increase the parallelism 
> until parallelism = max-parallelism.
> 2. If some tasks have data skew, it will also cause the same problem.
>  
> Therefore, we should introduce a new parameter judgment. If the degree of 
> parallelism continues to increase, the throughput will basically remain the 
> same. There is no need to expand  anymore.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35823) Introduce parameters to control the upper limit of rescale to avoid unlimited expansion due to server-side bottlenecks or data skew.

2024-07-11 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-35823:
---
Summary: Introduce parameters to control the upper limit of rescale to 
avoid unlimited expansion due to server-side bottlenecks or data skew.  (was: 
Introduce parameters to control the upper limit of rescale to avoid unlimited 
扩容 due to server-side bottlenecks or data skew.)

> Introduce parameters to control the upper limit of rescale to avoid unlimited 
> expansion due to server-side bottlenecks or data skew.
> 
>
> Key: FLINK-35823
> URL: https://issues.apache.org/jira/browse/FLINK-35823
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yuanfenghu
>Priority: Major
> Fix For: 2.0.0
>
>
> 1. If a Flink application writes data to other external storage systems, such 
> as HDFS, Kafka, etc., when the external server becomes the bottleneck of the 
> entire task, such as the throughput of HDFS decreases, the writing IO time 
> will increase, and the corresponding Flink The metric busy will also 
> increase. At this time, the autoscaler will determine that the parallelism 
> needs to be increased to increase the write rate. However, in the above case, 
> due to the bottleneck of the external server, this will not work. This will 
> cause the next determination cycle to continue to increase the parallelism 
> until parallelism = max-parallelism.
> 2. If some tasks have data skew, it will also cause the same problem.
>  
> Therefore, we should introduce a new parameter judgment. If the degree of 
> parallelism continues to increase, the throughput will basically remain the 
> same. There is no need to expand  anymore.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35823) Introduce parameters to control the upper limit of rescale to avoid unlimited expansion due to server-side bottlenecks or data skew.

2024-07-11 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865280#comment-17865280
 ] 

yuanfenghu commented on FLINK-35823:


I have discussed this issue with [~fanrui] . I wonder if other people in the 
community have any suggestions on this?
 

> Introduce parameters to control the upper limit of rescale to avoid unlimited 
> expansion due to server-side bottlenecks or data skew.
> 
>
> Key: FLINK-35823
> URL: https://issues.apache.org/jira/browse/FLINK-35823
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yuanfenghu
>Priority: Major
> Fix For: 2.0.0
>
>
> 1. If a Flink application writes data to other external storage systems, such 
> as HDFS, Kafka, etc., when the external server becomes the bottleneck of the 
> entire task, such as the throughput of HDFS decreases, the writing IO time 
> will increase, and the corresponding Flink The metric busy will also 
> increase. At this time, the autoscaler will determine that the parallelism 
> needs to be increased to increase the write rate. However, in the above case, 
> due to the bottleneck of the external server, this will not work. This will 
> cause the next determination cycle to continue to increase the parallelism 
> until parallelism = max-parallelism.
> 2. If some tasks have data skew, it will also cause the same problem.
>  
> Therefore, we should introduce a new parameter judgment. If the degree of 
> parallelism continues to increase, the throughput will basically remain the 
> same. There is no need to expand  anymore.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-07-14 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865776#comment-17865776
 ] 

yuanfenghu commented on FLINK-35035:


[~ztison] 
Thanks, I will look into FLIP in detail

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35851) autoscaler standlone mode use `GlobalConfiguration.loadConfiguration` load config

2024-07-15 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-35851:
---
Description: 我们应该使用

> autoscaler standlone mode use `GlobalConfiguration.loadConfiguration` load 
> config
> -
>
> Key: FLINK-35851
> URL: https://issues.apache.org/jira/browse/FLINK-35851
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yuanfenghu
>Priority: Minor
>
> 我们应该使用



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35851) autoscaler standlone mode use `GlobalConfiguration.loadConfiguration` load config

2024-07-15 Thread yuanfenghu (Jira)
yuanfenghu created FLINK-35851:
--

 Summary: autoscaler standlone mode use 
`GlobalConfiguration.loadConfiguration` load config
 Key: FLINK-35851
 URL: https://issues.apache.org/jira/browse/FLINK-35851
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler
Reporter: yuanfenghu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35851) autoscaler standlone mode use `GlobalConfiguration.loadConfiguration` load config

2024-07-15 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-35851:
---
Description: 
We should use GlobalConfiguration.loadConfiguration to load the autocaler 
configuration and keep it consistent with other components
 

  was:我们应该使用


> autoscaler standlone mode use `GlobalConfiguration.loadConfiguration` load 
> config
> -
>
> Key: FLINK-35851
> URL: https://issues.apache.org/jira/browse/FLINK-35851
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yuanfenghu
>Priority: Minor
>
> We should use GlobalConfiguration.loadConfiguration to load the autocaler 
> configuration and keep it consistent with other components
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35851) autoscaler standlone mode use `GlobalConfiguration.loadConfiguration` load config

2024-07-16 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17866626#comment-17866626
 ] 

yuanfenghu commented on FLINK-35851:


[~fanrui] Thanks for the reply. 

My idea is the same as yours. need to load the configuration from config first, 
and then the parameters in main will overwrite those in the config file.
 

> autoscaler standlone mode use `GlobalConfiguration.loadConfiguration` load 
> config
> -
>
> Key: FLINK-35851
> URL: https://issues.apache.org/jira/browse/FLINK-35851
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yuanfenghu
>Assignee: yuanfenghu
>Priority: Minor
>
> We should use GlobalConfiguration.loadConfiguration to load the autocaler 
> configuration and keep it consistent with other components
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35851) autoscaler standlone mode use `GlobalConfiguration.loadConfiguration` load config

2024-07-16 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17866626#comment-17866626
 ] 

yuanfenghu edited comment on FLINK-35851 at 7/17/24 6:05 AM:
-

[~fanrui] Thanks for the reply.

 
Although the autoscaler does not have an independent dist directory, it is 
quite easy to mount a configuration file in a container environment.

My idea is the same as yours. need to load the configuration from config first, 
and then the parameters in main will overwrite those in the config file.
 


was (Author: JIRAUSER296932):
[~fanrui] Thanks for the reply. 

My idea is the same as yours. need to load the configuration from config first, 
and then the parameters in main will overwrite those in the config file.
 

> autoscaler standlone mode use `GlobalConfiguration.loadConfiguration` load 
> config
> -
>
> Key: FLINK-35851
> URL: https://issues.apache.org/jira/browse/FLINK-35851
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yuanfenghu
>Assignee: yuanfenghu
>Priority: Minor
>
> We should use GlobalConfiguration.loadConfiguration to load the autocaler 
> configuration and keep it consistent with other components
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35896) eventHandler in RescaleApiScalingRealizer handle the event of scaling failure

2024-07-25 Thread yuanfenghu (Jira)
yuanfenghu created FLINK-35896:
--

 Summary: eventHandler in RescaleApiScalingRealizer  handle the 
event of scaling failure
 Key: FLINK-35896
 URL: https://issues.apache.org/jira/browse/FLINK-35896
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler
Affects Versions: 2.0.0
Reporter: yuanfenghu


When using flink-autoscaler-standalone, if fail during the Scaling process, 
need to handle the failure event through evenhandler
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35896) eventHandler in RescaleApiScalingRealizer handle the event of scaling failure

2024-07-25 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868629#comment-17868629
 ] 

yuanfenghu commented on FLINK-35896:


[~fanrui] 
Can you give me some advice on this jira? Thank you very much.
 

> eventHandler in RescaleApiScalingRealizer  handle the event of scaling failure
> --
>
> Key: FLINK-35896
> URL: https://issues.apache.org/jira/browse/FLINK-35896
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Affects Versions: 2.0.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When using flink-autoscaler-standalone, if fail during the Scaling process, 
> need to handle the failure event through evenhandler
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35896) eventHandler in RescaleApiScalingRealizer handle the event of scaling failure

2024-07-25 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868631#comment-17868631
 ] 

yuanfenghu edited comment on FLINK-35896 at 7/25/24 10:23 AM:
--

When an exception occurs in ScalingRealizer.realizeParallelismOverrides, I 
encountered this issue locally, but I couldn't retrieve this exception from the 
eventhandler table. The first approach is to catch it in JobAutoScalerImpl.
{code:java}
try {
scalingRealizer.realizeParallelismOverrides(ctx, userOverrides);
} catch (Exception e) {
eventHandler.handleException(ctx, AUTOSCALER_ERROR, e);
} {code}
The other approach is to handle it separately in RescaleApiScalingRealizer, but 
I prefer the former

 


was (Author: JIRAUSER296932):
When an exception occurs in ScalingRealizer.realizeParallelismOverrides, I 
encountered this issue locally, but I couldn't retrieve this exception from the 
eventhandler table. The first approach is to catch it in JobAutoScalerImpl.
{code:java}
//代码占位符
try {
scalingRealizer.realizeParallelismOverrides(ctx, userOverrides);
} catch (Exception e) {
eventHandler.handleException(ctx, AUTOSCALER_ERROR, e);
} {code}
The other approach is to handle it separately in RescaleApiScalingRealizer, but 
I prefer the former

 

> eventHandler in RescaleApiScalingRealizer  handle the event of scaling failure
> --
>
> Key: FLINK-35896
> URL: https://issues.apache.org/jira/browse/FLINK-35896
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Affects Versions: 2.0.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When using flink-autoscaler-standalone, if fail during the Scaling process, 
> need to handle the failure event through evenhandler
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35896) eventHandler in RescaleApiScalingRealizer handle the event of scaling failure

2024-07-25 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868631#comment-17868631
 ] 

yuanfenghu commented on FLINK-35896:


When an exception occurs in ScalingRealizer.realizeParallelismOverrides, I 
encountered this issue locally, but I couldn't retrieve this exception from the 
eventhandler table. The first approach is to catch it in JobAutoScalerImpl.
{code:java}
//代码占位符
try {
scalingRealizer.realizeParallelismOverrides(ctx, userOverrides);
} catch (Exception e) {
eventHandler.handleException(ctx, AUTOSCALER_ERROR, e);
} {code}
The other approach is to handle it separately in RescaleApiScalingRealizer, but 
I prefer the former

 

> eventHandler in RescaleApiScalingRealizer  handle the event of scaling failure
> --
>
> Key: FLINK-35896
> URL: https://issues.apache.org/jira/browse/FLINK-35896
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Affects Versions: 2.0.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When using flink-autoscaler-standalone, if fail during the Scaling process, 
> need to handle the failure event through evenhandler
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35896) eventHandler in RescaleApiScalingRealizer handle the event of scaling failure

2024-07-25 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868639#comment-17868639
 ] 

yuanfenghu commented on FLINK-35896:


> RescaleApiScalingRealizer#realizeParallelismOverrides [1] will catch the 
>Exception, and only log it. So does your approach1 remove this catch, right?

 
There will be log printing, but I feel it would be better to record it in the 
event. Obviously we record the event when we succeed, so should we also record 
it when an exception occurs? Obvious, the Scaling action it is relatively low 
frequency.  It looks good to log it into an event
 
> Also, may I know what exception happens in 
>ScalingRealizer.realizeParallelismOverrides?
 

Yes, our environment encapsulates the flink rest api, and there are some 
internal container call errors. This is different from the community. However, 
I think if the jobmanager of flink is directly called in the rest method, there 
may be exceptions, such as timeout.
 

> eventHandler in RescaleApiScalingRealizer  handle the event of scaling failure
> --
>
> Key: FLINK-35896
> URL: https://issues.apache.org/jira/browse/FLINK-35896
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Affects Versions: 2.0.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When using flink-autoscaler-standalone, if fail during the Scaling process, 
> need to handle the failure event through evenhandler
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35903) Add CentralizedSlotAssigner support to adaptive scheduler

2024-07-26 Thread yuanfenghu (Jira)
yuanfenghu created FLINK-35903:
--

 Summary: Add CentralizedSlotAssigner support to adaptive scheduler
 Key: FLINK-35903
 URL: https://issues.apache.org/jira/browse/FLINK-35903
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: yuanfenghu


h2. *Question:* 

When using an adaptive scheduler for the current task, reducing the task's 
parallelism via REST API triggers the task to restart. If the numberOfTaskSlots 
in my taskmanager is greater than 1, it may result in some taskmanager slots 
being idle and unable to release the taskmanager resource.
h3. *How can this issue be resolved:* 

We need a new SlotAssigner strategy. When the task triggers the above process, 
the slots should be requested centrally within the taskmanager to ensure we can 
maximize the release of unnecessary resources.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35903) Add CentralizedSlotAssigner support to adaptive scheduler

2024-07-26 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-35903:
---
Affects Version/s: 2.0.0

> Add CentralizedSlotAssigner support to adaptive scheduler
> -
>
> Key: FLINK-35903
> URL: https://issues.apache.org/jira/browse/FLINK-35903
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 2.0.0
>Reporter: yuanfenghu
>Priority: Major
>
> h2. *Question:* 
> When using an adaptive scheduler for the current task, reducing the task's 
> parallelism via REST API triggers the task to restart. If the 
> numberOfTaskSlots in my taskmanager is greater than 1, it may result in some 
> taskmanager slots being idle and unable to release the taskmanager resource.
> h3. *How can this issue be resolved:* 
> We need a new SlotAssigner strategy. When the task triggers the above 
> process, the slots should be requested centrally within the taskmanager to 
> ensure we can maximize the release of unnecessary resources.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35903) Add CentralizedSlotAssigner support to adaptive scheduler

2024-07-26 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu closed FLINK-35903.
--
Resolution: Duplicate

> Add CentralizedSlotAssigner support to adaptive scheduler
> -
>
> Key: FLINK-35903
> URL: https://issues.apache.org/jira/browse/FLINK-35903
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 2.0.0
>Reporter: yuanfenghu
>Priority: Major
>
> h2. *Question:* 
> When using an adaptive scheduler for the current task, reducing the task's 
> parallelism via REST API triggers the task to restart. If the 
> numberOfTaskSlots in my taskmanager is greater than 1, it may result in some 
> taskmanager slots being idle and unable to release the taskmanager resource.
> h3. *How can this issue be resolved:* 
> We need a new SlotAssigner strategy. When the task triggers the above 
> process, the slots should be requested centrally within the taskmanager to 
> ensure we can maximize the release of unnecessary resources.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35903) Add CentralizedSlotAssigner support to adaptive scheduler

2024-07-26 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868874#comment-17868874
 ] 

yuanfenghu commented on FLINK-35903:


[~Zhanghao Chen] thansk, I closed this issue
 

> Add CentralizedSlotAssigner support to adaptive scheduler
> -
>
> Key: FLINK-35903
> URL: https://issues.apache.org/jira/browse/FLINK-35903
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 2.0.0
>Reporter: yuanfenghu
>Priority: Major
>
> h2. *Question:* 
> When using an adaptive scheduler for the current task, reducing the task's 
> parallelism via REST API triggers the task to restart. If the 
> numberOfTaskSlots in my taskmanager is greater than 1, it may result in some 
> taskmanager slots being idle and unable to release the taskmanager resource.
> h3. *How can this issue be resolved:* 
> We need a new SlotAssigner strategy. When the task triggers the above 
> process, the slots should be requested centrally within the taskmanager to 
> ensure we can maximize the release of unnecessary resources.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35903) Add CentralizedSlotAssigner support to adaptive scheduler

2024-07-26 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868874#comment-17868874
 ] 

yuanfenghu edited comment on FLINK-35903 at 7/26/24 7:19 AM:
-

[~Zhanghao Chen] thanks, I closed this issue
 


was (Author: JIRAUSER296932):
[~Zhanghao Chen] thansk, I closed this issue
 

> Add CentralizedSlotAssigner support to adaptive scheduler
> -
>
> Key: FLINK-35903
> URL: https://issues.apache.org/jira/browse/FLINK-35903
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 2.0.0
>Reporter: yuanfenghu
>Priority: Major
>
> h2. *Question:* 
> When using an adaptive scheduler for the current task, reducing the task's 
> parallelism via REST API triggers the task to restart. If the 
> numberOfTaskSlots in my taskmanager is greater than 1, it may result in some 
> taskmanager slots being idle and unable to release the taskmanager resource.
> h3. *How can this issue be resolved:* 
> We need a new SlotAssigner strategy. When the task triggers the above 
> process, the slots should be requested centrally within the taskmanager to 
> ensure we can maximize the release of unnecessary resources.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35903) Add CentralizedSlotAssigner support to adaptive scheduler

2024-07-26 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868877#comment-17868877
 ] 

yuanfenghu commented on FLINK-35903:


> Instead, all implementations of SlotAssigner should ensure TM can be released 
> when parallelism is reduced.
 

sounds good!

> Add CentralizedSlotAssigner support to adaptive scheduler
> -
>
> Key: FLINK-35903
> URL: https://issues.apache.org/jira/browse/FLINK-35903
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 2.0.0
>Reporter: yuanfenghu
>Priority: Major
>
> h2. *Question:* 
> When using an adaptive scheduler for the current task, reducing the task's 
> parallelism via REST API triggers the task to restart. If the 
> numberOfTaskSlots in my taskmanager is greater than 1, it may result in some 
> taskmanager slots being idle and unable to release the taskmanager resource.
> h3. *How can this issue be resolved:* 
> We need a new SlotAssigner strategy. When the task triggers the above 
> process, the slots should be requested centrally within the taskmanager to 
> ensure we can maximize the release of unnecessary resources.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34535) Support JobPlanInfo for the explain result

2024-02-27 Thread yuanfenghu (Jira)
yuanfenghu created FLINK-34535:
--

 Summary: Support JobPlanInfo for the explain result
 Key: FLINK-34535
 URL: https://issues.apache.org/jira/browse/FLINK-34535
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: yuanfenghu


In the Flink Sql Explain syntax, we can set ExplainDetails to plan 
JSON_EXECUTION_PLAN, but we cannot plan JobPlanInfo. If we can explain this 
part of the information, referring to JobPlanInfo, I can combine it with the 
parameter `pipeline.jobvertex-parallelism-overrides` to set up my task 
parallelism



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34535) Support JobPlanInfo for the explain result

2024-02-27 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17821481#comment-17821481
 ] 

yuanfenghu commented on FLINK-34535:


cc [~luoyuxia] 

> Support JobPlanInfo for the explain result
> --
>
> Key: FLINK-34535
> URL: https://issues.apache.org/jira/browse/FLINK-34535
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: yuanfenghu
>Priority: Major
>
> In the Flink Sql Explain syntax, we can set ExplainDetails to plan 
> JSON_EXECUTION_PLAN, but we cannot plan JobPlanInfo. If we can explain this 
> part of the information, referring to JobPlanInfo, I can combine it with the 
> parameter `pipeline.jobvertex-parallelism-overrides` to set up my task 
> parallelism



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34535) Support JobPlanInfo for the explain result

2024-02-29 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822399#comment-17822399
 ] 

yuanfenghu commented on FLINK-34535:


[~lincoln.86xy] 
Thank you for your comment,

> Regarding to the motivation 'combine it with the parameter 
> `pipeline.jobvertex-parallelism-overrides` to set up my task parallelism', 
> could you explain more about it?

`pipeline.jobvertex-parallelism-overrides` Parameters can modify the 
parallelism of each flink task before flink runs the task. He needs to specify 
the parallelism value of vertex in jobgraph, like this:
{code:java}
//代码占位符
pipeline.jobvertex-parallelism-overrides = 
0a448493b4782967b150582570326227:4,bc764cd8ddf7a0cff126f51c16239658:3 {code}
This way when flink runs, the parallelism of the corresponding vertexid: 
0a448493b4782967b150582570326227 and bc764cd8ddf7a0cff126f51c16239658 will be 
set to 4,3.

So my motivation is that I want to set the parallelism of each of my tasks in 
the task generated by flinksql, but in flinksql parallelism is set globally, so 
I need to get each of my jobVertexId before the task is run. But the existing 
explain does not return this information, so I want to return this information 
in the explain

> (Also adding new mode to current `ExplainDetail` is a public api change, 
>there should be a 
>[FLIP|https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals?src=contextnavpagetreemode]
> discussion)

 
I noticed ExplainDetail have a @PublicEvolving, But in fact, my function is 
relatively simple, and it should be to build a separate FLIP? Maybe if we could 
put this information in What about JSON_EXECUTION_PLAN? 

 
 

>  COMPILE PLAN

COMPILE PLAN It seems to be a reasonable way also,Can you help me @ some 
friends who are more familiar with this area (flinksql)? 
Discuss their views on this issue
 
 
 

 

 

> Support JobPlanInfo for the explain result
> --
>
> Key: FLINK-34535
> URL: https://issues.apache.org/jira/browse/FLINK-34535
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: yuanfenghu
>Priority: Major
>  Labels: pull-request-available
>
> In the Flink Sql Explain syntax, we can set ExplainDetails to plan 
> JSON_EXECUTION_PLAN, but we cannot plan JobPlanInfo. If we can explain this 
> part of the information, referring to JobPlanInfo, I can combine it with the 
> parameter `pipeline.jobvertex-parallelism-overrides` to set up my task 
> parallelism



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34535) Support JobPlanInfo for the explain result

2024-03-01 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822472#comment-17822472
 ] 

yuanfenghu commented on FLINK-34535:


Hi [~jeyhunkarimov] 

谢谢你的评论,其实我的想法很简单,我希望在使用 flink sql 时, sql-client or 
sql-gateway,生成的flink任务中我能有办法能单独设置我每个 vertex的并行度

比如我现在有个简单的任务生成的jobgraph是: 

 

                              -> window1  -> sink1

KAFKA SOURCE ->  window2  -> sink2

                              -> window3  -> sink3

 
In flinksql, I can set the parallelism of the entire job through set 
parallelism.default=x, but if I want to set the parallelism of each vertex, I 
cannot do it.

所以我如果通过explain语法拿到整个任务的jobgraph信息:

                                     -> window1(id2)  -> sink1(id5)

KAFKA SOURCE(id1) ->  window2(id3)  -> sink2(id6)

                                     -> window3(id4)  -> sink3(id7)

配合参数: pipeline.jobvertex-parallelism-overrides
 
 

> Support JobPlanInfo for the explain result
> --
>
> Key: FLINK-34535
> URL: https://issues.apache.org/jira/browse/FLINK-34535
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: yuanfenghu
>Priority: Major
>  Labels: pull-request-available
>
> In the Flink Sql Explain syntax, we can set ExplainDetails to plan 
> JSON_EXECUTION_PLAN, but we cannot plan JobPlanInfo. If we can explain this 
> part of the information, referring to JobPlanInfo, I can combine it with the 
> parameter `pipeline.jobvertex-parallelism-overrides` to set up my task 
> parallelism



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34535) Support JobPlanInfo for the explain result

2024-03-01 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822472#comment-17822472
 ] 

yuanfenghu edited comment on FLINK-34535 at 3/1/24 9:45 AM:


Hi [~jeyhunkarimov] 

Thank you for your comment. In fact, my idea is very simple. I hope that when 
using flink sql, sql-client or sql-gateway, I can have a way to independently 
set the parallelism of each vertex in the generated flink task.

For example, I now have a simple task and the jobgraph generated is

: 

 

                              -> window1  -> sink1

KAFKA SOURCE ->  window2  -> sink2

                              -> window3  -> sink3

 
In flinksql, I can set the parallelism of the entire job through set 
parallelism.default=x, but if I want to set the parallelism of each vertex, I 
cannot do it.

 
So if I get the jobgraph information of the entire task through explain 
syntax(with vertexId):

 

                                     -> window1(id2)  -> sink1(id5)

KAFKA SOURCE(id1) ->  window2(id3)  -> sink2(id6)

                                     -> window3(id4)  -> sink3(id7)

 
Fit parameters: pipeline.jobvertex-parallelism-overrides

like:  

set 
pipeline.jobvertex-parallelism-overrides=id1:1,id2:2,id3:2,id4:2,id5:1,id6:1,id7:1

 
I can change the graph to:

                                           -> window1(id2 p=2)  -> sink1(id5 
p=1)

KAFKA SOURCE(id1 p=1) ->  window2(id3 p=2)  -> sink2(id6 p=1)

                                           -> window3(id4  p=2)  -> sink3(id7 
p=1)

 
So I hope that this part of the information can be planned out when explaining. 
Maybe this is not necessarily the best method. Maybe it can be done through 
`COMPILE PLAN`?
 

 
 


was (Author: JIRAUSER296932):
Hi [~jeyhunkarimov] 

谢谢你的评论,其实我的想法很简单,我希望在使用 flink sql 时, sql-client or 
sql-gateway,生成的flink任务中我能有办法能单独设置我每个 vertex的并行度

比如我现在有个简单的任务生成的jobgraph是: 

 

                              -> window1  -> sink1

KAFKA SOURCE ->  window2  -> sink2

                              -> window3  -> sink3

 
In flinksql, I can set the parallelism of the entire job through set 
parallelism.default=x, but if I want to set the parallelism of each vertex, I 
cannot do it.

所以我如果通过explain语法拿到整个任务的jobgraph信息:

                                     -> window1(id2)  -> sink1(id5)

KAFKA SOURCE(id1) ->  window2(id3)  -> sink2(id6)

                                     -> window3(id4)  -> sink3(id7)

配合参数: pipeline.jobvertex-parallelism-overrides
 
 

> Support JobPlanInfo for the explain result
> --
>
> Key: FLINK-34535
> URL: https://issues.apache.org/jira/browse/FLINK-34535
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: yuanfenghu
>Priority: Major
>  Labels: pull-request-available
>
> In the Flink Sql Explain syntax, we can set ExplainDetails to plan 
> JSON_EXECUTION_PLAN, but we cannot plan JobPlanInfo. If we can explain this 
> part of the information, referring to JobPlanInfo, I can combine it with the 
> parameter `pipeline.jobvertex-parallelism-overrides` to set up my task 
> parallelism



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-07 Thread yuanfenghu (Jira)
yuanfenghu created FLINK-35035:
--

 Summary: Reduce job pause time when cluster resources are expanded 
in adaptive mode
 Key: FLINK-35035
 URL: https://issues.apache.org/jira/browse/FLINK-35035
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Affects Versions: 1.19.0
Reporter: yuanfenghu


When 'jobmanager.scheduler = adaptive' , job graph changes triggered by cluster 
expansion will cause long-term task stagnation. We should reduce this impact.
As an example:
I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
When I add slots the task will trigger jobgraph changes,by
org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
However, the five new slots I added were not discovered at the same time (for 
convenience, I assume that a taskmanager has one slot), because no matter what 
environment we add, we cannot guarantee that the new slots will be added at 
once, so this will cause onNewResourcesAvailable triggers repeatedly
,If each new slot action has a certain interval, then the jobgraph will 
continue to change during this period. What I hope is that there will be a 
stable time to configure the cluster resources, and then go to it after the 
number of cluster slots has been stable for a certain period of time. Trigger 
jobgraph changes to avoid this situation
 。



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-07 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-35035:
---
Description: 
When 'jobmanager.scheduler = adaptive' , job graph changes triggered by cluster 
expansion will cause long-term task stagnation. We should reduce this impact.
As an example:
I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
When I add slots the task will trigger jobgraph changes,by
org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
However, the five new slots I added were not discovered at the same time (for 
convenience, I assume that a taskmanager has one slot), because no matter what 
environment we add, we cannot guarantee that the new slots will be added at 
once, so this will cause onNewResourcesAvailable triggers repeatedly
,If each new slot action has a certain interval, then the jobgraph will 
continue to change during this period. What I hope is that there will be a 
stable time to configure the cluster resources, and then go to it after the 
number of cluster slots has been stable for a certain period of time. Trigger 
jobgraph changes to avoid this situation

  was:
When 'jobmanager.scheduler = adaptive' , job graph changes triggered by cluster 
expansion will cause long-term task stagnation. We should reduce this impact.
As an example:
I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
When I add slots the task will trigger jobgraph changes,by
org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
However, the five new slots I added were not discovered at the same time (for 
convenience, I assume that a taskmanager has one slot), because no matter what 
environment we add, we cannot guarantee that the new slots will be added at 
once, so this will cause onNewResourcesAvailable triggers repeatedly
,If each new slot action has a certain interval, then the jobgraph will 
continue to change during this period. What I hope is that there will be a 
stable time to configure the cluster resources, and then go to it after the 
number of cluster slots has been stable for a certain period of time. Trigger 
jobgraph changes to avoid this situation
 。


> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources, and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-07 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-35035:
---
Description: 
When 'jobmanager.scheduler = adaptive' , job graph changes triggered by cluster 
expansion will cause long-term task stagnation. We should reduce this impact.
As an example:
I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
When I add slots the task will trigger jobgraph changes,by
org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
However, the five new slots I added were not discovered at the same time (for 
convenience, I assume that a taskmanager has one slot), because no matter what 
environment we add, we cannot guarantee that the new slots will be added at 
once, so this will cause onNewResourcesAvailable triggers repeatedly
,If each new slot action has a certain interval, then the jobgraph will 
continue to change during this period. What I hope is that there will be a 
stable time to configure the cluster resources  and then go to it after the 
number of cluster slots has been stable for a certain period of time. Trigger 
jobgraph changes to avoid this situation

  was:
When 'jobmanager.scheduler = adaptive' , job graph changes triggered by cluster 
expansion will cause long-term task stagnation. We should reduce this impact.
As an example:
I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
When I add slots the task will trigger jobgraph changes,by
org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
However, the five new slots I added were not discovered at the same time (for 
convenience, I assume that a taskmanager has one slot), because no matter what 
environment we add, we cannot guarantee that the new slots will be added at 
once, so this will cause onNewResourcesAvailable triggers repeatedly
,If each new slot action has a certain interval, then the jobgraph will 
continue to change during this period. What I hope is that there will be a 
stable time to configure the cluster resources, and then go to it after the 
number of cluster slots has been stable for a certain period of time. Trigger 
jobgraph changes to avoid this situation


> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-08 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835106#comment-17835106
 ] 

yuanfenghu commented on FLINK-35035:


[~bgeng777] 
Thank you for your comment. As you understand, I hope that if a new tm occurs 
during the running of the task, the task should not restart immediately, but 
wait for a period of time.
Below I will explain this process in detail:
Taking the example I gave at the beginning, assume that the current task is [v1 
(maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)], but the total number of slots in 
the cluster is 5, so the task is running [v1 p5]->[v2 p5] runs. I have now 
added 5 slots. I hope the task will run with [v1 p10]->[v2 p10]. When the 
number of cluster slots becomes 6, the task will immediately trigger a restart. 
At this time, according to the 
jobmanager.adaptive-scheduler.resource-stabilization-timeout parameter, the 
task will wait for a period of time during the restart phase for resources. If 
the slot does not reach the target slot number of 10 during this period, the 
task will run with a lower degree of parallelism. , but my slots will be added 
to 10 over a period of time, so this will trigger another expansion and restart 
process. In this process, I have one more restart process and one more resource 
waiting process. Why don't we start before the first restart? Should I wait for 
a period of time or determine that the number of slots meets my p=10 before 
triggering the restart (scale up) action?

 
 [~echauchot]  hi, can you help me look into this issue, it seems similar to 
FLINK-21883.
 
Thanks
 
 

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-08 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835106#comment-17835106
 ] 

yuanfenghu edited comment on FLINK-35035 at 4/9/24 2:22 AM:


[~bgeng777] 
Thank you for your comment. As you understand, I hope that if a new tm occurs 
during the running of the task, the task should not restart immediately, but 
wait for a period of time.
Below I will explain this process in detail:
Taking the example I gave at the beginning, assume that the current task is [v1 
(maxp=10 minp = 1)]  > [v2 (maxp=10, minp=1)], but the total number of slots in 
the cluster is 5, so the task is running [v1 p5]>[v2 p5] runs. I have now added 
5 slots. I hope the task will run with [v1 p10]->[v2 p10]. When the number of 
cluster slots becomes 6, the task will immediately trigger a restart. At this 
time, according to the 
jobmanager.adaptive-scheduler.resource-stabilization-timeout parameter, the 
task will wait for a period of time during the restart phase for resources. If 
the slot does not reach the target slot number of 10 during this period, the 
task will run with a lower degree of parallelism. , but my slots will be added 
to 10 over a period of time, so this will trigger another expansion and restart 
process. In this process, I have one more restart process and one more resource 
waiting process. Why don't we start before the first restart? Should I wait for 
a period of time or determine that the number of slots meets my p=10 before 
triggering the restart (scale up) action?

 
 [~echauchot]  hi, can you help me look into this issue, it seems similar to 
FLINK-21883.
 
Thanks
 
 


was (Author: JIRAUSER296932):
[~bgeng777] 
Thank you for your comment. As you understand, I hope that if a new tm occurs 
during the running of the task, the task should not restart immediately, but 
wait for a period of time.
Below I will explain this process in detail:
Taking the example I gave at the beginning, assume that the current task is [v1 
(maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)], but the total number of slots in 
the cluster is 5, so the task is running [v1 p5]->[v2 p5] runs. I have now 
added 5 slots. I hope the task will run with [v1 p10]->[v2 p10]. When the 
number of cluster slots becomes 6, the task will immediately trigger a restart. 
At this time, according to the 
jobmanager.adaptive-scheduler.resource-stabilization-timeout parameter, the 
task will wait for a period of time during the restart phase for resources. If 
the slot does not reach the target slot number of 10 during this period, the 
task will run with a lower degree of parallelism. , but my slots will be added 
to 10 over a period of time, so this will trigger another expansion and restart 
process. In this process, I have one more restart process and one more resource 
waiting process. Why don't we start before the first restart? Should I wait for 
a period of time or determine that the number of slots meets my p=10 before 
triggering the restart (scale up) action?

 
 [~echauchot]  hi, can you help me look into this issue, it seems similar to 
FLINK-21883.
 
Thanks
 
 

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-09 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835564#comment-17835564
 ] 

yuanfenghu commented on FLINK-35035:


[~echauchot] 
Thank you for your reply, but I have some questions:

jobmanager.adaptive-scheduler.min-parallelism-increase is a parameter on 
jobmanager, so I cannot update this value after the task is started. Assuming 
it is set to 5, this time it causes some problems:

The original task is [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]. If I 
call restapi, the parallelism is overwritten to the new [v1 (maxp=12 minp = 1)] 
-> [v2 (maxp=12, minp=1)], then I added slots to the cluster, but obviously I 
only need to add 2 slots to meet the requirements, but because 
min-parallelism-increase was not reached, So this will not cause the task to 
trigger expansion. It needs to wait until scaling-interval.max is reached 
before triggering (scaling-interval.max needs to be set first). I think in this 
case, should I add a configuration item to support its triggering?
 

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-09 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835564#comment-17835564
 ] 

yuanfenghu edited comment on FLINK-35035 at 4/10/24 2:08 AM:
-

[~echauchot] 
Thank you for your reply, but I have some questions:

jobmanager.adaptive-scheduler.min-parallelism-increase is a parameter on 
jobmanager, so I cannot update this value after the cluster is started. 
Assuming it is set to 5, this time it causes some problems:

The original task is [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]. If I 
call restapi, the parallelism is overwritten to the new [v1 (maxp=12 minp = 1)] 
-> [v2 (maxp=12, minp=1)], then I added slots to the cluster, but obviously I 
only need to add 2 slots to meet the requirements, but because 
min-parallelism-increase was not reached, So this will not cause the task to 
trigger expansion. It needs to wait until scaling-interval.max is reached 
before triggering (scaling-interval.max needs to be set first). I think in this 
case, should I add a configuration item to support its triggering?
 


was (Author: JIRAUSER296932):
[~echauchot] 
Thank you for your reply, but I have some questions:

jobmanager.adaptive-scheduler.min-parallelism-increase is a parameter on 
jobmanager, so I cannot update this value after the task is started. Assuming 
it is set to 5, this time it causes some problems:

The original task is [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]. If I 
call restapi, the parallelism is overwritten to the new [v1 (maxp=12 minp = 1)] 
-> [v2 (maxp=12, minp=1)], then I added slots to the cluster, but obviously I 
only need to add 2 slots to meet the requirements, but because 
min-parallelism-increase was not reached, So this will not cause the task to 
trigger expansion. It needs to wait until scaling-interval.max is reached 
before triggering (scaling-interval.max needs to be set first). I think in this 
case, should I add a configuration item to support its triggering?
 

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-09 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835564#comment-17835564
 ] 

yuanfenghu edited comment on FLINK-35035 at 4/10/24 2:19 AM:
-

[~echauchot] 
Thank you for your reply, but I have some questions:

jobmanager.adaptive-scheduler.min-parallelism-increase is a parameter on 
jobmanager, so I cannot update this value after the cluster is started. 
Assuming it is set to 5, this time it causes some problems:

The original task is [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]. If I 
call restapi, the parallelism is overwritten to the new [v1 (maxp=12 minp = 1)] 
-> [v2 (maxp=12, minp=1)], then I added slots to the cluster, but obviously I 
only need to add 2 slots to meet the requirements, but because 
min-parallelism-increase was not reached, So this will not cause the task to 
trigger expansion. It needs to wait until scaling-interval.max is reached 
before triggering (scaling-interval.max needs to be set first). I think in this 
case, should I add a configuration item to support its triggering?

 

Maybe can add a switch similar to 
jobmanager.adaptive-scheduler.min-parallelism-increase. When the resource 
changes, it will be judged whether the current resource fully meets the 
parallelism requirements of the job. If it is satisfied, rescheduling will be 
triggered directly. If it is not satisfied, it will be rescheduled in after 
scaling-interval.max . WDYT? [~echauchot] 

Looking forward to your reply!

 


was (Author: JIRAUSER296932):
[~echauchot] 
Thank you for your reply, but I have some questions:

jobmanager.adaptive-scheduler.min-parallelism-increase is a parameter on 
jobmanager, so I cannot update this value after the cluster is started. 
Assuming it is set to 5, this time it causes some problems:

The original task is [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]. If I 
call restapi, the parallelism is overwritten to the new [v1 (maxp=12 minp = 1)] 
-> [v2 (maxp=12, minp=1)], then I added slots to the cluster, but obviously I 
only need to add 2 slots to meet the requirements, but because 
min-parallelism-increase was not reached, So this will not cause the task to 
trigger expansion. It needs to wait until scaling-interval.max is reached 
before triggering (scaling-interval.max needs to be set first). I think in this 
case, should I add a configuration item to support its triggering?
 

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-11 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836395#comment-17836395
 ] 

yuanfenghu commented on FLINK-35035:


[~echauchot] 

Thank you for your reply.

You should look at this issue from the perspective of Reactive Mode, because 
Reactive Mode only uses the resources of the cluster as a criterion for task 
parallelism. I don’t know if I understand it correctly.

But my above scenario is in non-Reactive Mode. But I use the adaptive 
scheduler, which means that I increase the parallelism of the running task from 
10 to 12. However, because min-parallelism-increase=5, I am satisfied in the 
cluster slot. When the condition of 12 is met, the expansion of the task cannot 
be triggered immediately, but it needs to wait for scaling-interval.max before 
the expansion can be triggered. My purpose is to trigger the expansion when the 
parallelism of 12 is met, instead of having to after scaling-interval.max

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-11 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836395#comment-17836395
 ] 

yuanfenghu edited comment on FLINK-35035 at 4/12/24 6:08 AM:
-

[~echauchot] 

Thank you for your reply.
I think you are looking at this scene from the perspective of Reactive Mode, 
because Reactive Mode only uses the resources of the cluster as a criterion for 
task parallelism. I don’t know if I understand it correctly.

But my above scenario is in non-Reactive Mode. I just use the adaptive 
scheduler, which means that I increase the parallelism of the running task from 
10 to 12. However, because min-parallelism-increase=5, I am satisfied in the 
cluster slot. When the condition of 12 is met, the expansion of the task cannot 
be triggered immediately, but it needs to wait for scaling-interval.max before 
the expansion can be triggered. My purpose is to trigger the expansion when the 
parallelism of 12 is met, instead of having to after scaling-interval.max or 
min-parallelism-increase


was (Author: JIRAUSER296932):
[~echauchot] 

Thank you for your reply.

You should look at this issue from the perspective of Reactive Mode, because 
Reactive Mode only uses the resources of the cluster as a criterion for task 
parallelism. I don’t know if I understand it correctly.

But my above scenario is in non-Reactive Mode. But I use the adaptive 
scheduler, which means that I increase the parallelism of the running task from 
10 to 12. However, because min-parallelism-increase=5, I am satisfied in the 
cluster slot. When the condition of 12 is met, the expansion of the task cannot 
be triggered immediately, but it needs to wait for scaling-interval.max before 
the expansion can be triggered. My purpose is to trigger the expansion when the 
parallelism of 12 is met, instead of having to after scaling-interval.max

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-12 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836504#comment-17836504
 ] 

yuanfenghu commented on FLINK-35035:


[~echauchot] 
Thank you for your patience in tracking this issue!
 
> The only thing is that you will have more frequent rescales (each time a slot 
> is added to the cluster) modulo slots that are added during the stabilization 
> period that do not lead to a rescale.

 
This is the problem. Imagine that I originally wanted to adjust the parallelism 
degree from 10 to 12. My execution step is to first adjust the maximum 
parallelism degree of my job to 12 through the rest api, and then I add tm to 
the cluster. If min-parallelism-increase=1 Then my job may trigger the scaling 
process twice when I change the number of slots from 10 to 12. This process may 
last for minutes, if min-parallelism-increase > 2, such as 5, Then my job has 
to wait until scaling-interval.max before scaling. I think we can optimize this 
process ,let the job trigger scaling exactly when slot becomes 12
 
 

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-12 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836508#comment-17836508
 ] 

yuanfenghu commented on FLINK-35035:


> It is not slot per slot rescale, there will be only one rescale in these 
>cases:
 * if the TM comes with 2 slots at once
 * if the second slot comes during the stabilization timeout

 
Yes, there is only one scaling in both cases. However, if a tm has only one 
slot and the time interval between registration and jm is relatively long, it 
will be triggered twice.
 
> That being said, I know there is an ongoing reflection in the community to 
> decrease the overall timeouts during rescale.

good, Can you please help me @ the person who worked on this? Maybe can follow 
up on this question, thanks!
 
 

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35926) During rescale, jobmanager has incorrect judgment logic for the max parallelism.

2024-07-30 Thread yuanfenghu (Jira)
yuanfenghu created FLINK-35926:
--

 Summary: During rescale, jobmanager has incorrect judgment logic 
for the max parallelism.
 Key: FLINK-35926
 URL: https://issues.apache.org/jira/browse/FLINK-35926
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.19.1
 Environment: flink-1.19.1

There is a high probability that 1.18 has the same problem
Reporter: yuanfenghu
 Attachments: image-2024-07-30-14-56-48-931.png, 
image-2024-07-30-14-59-26-976.png, image-2024-07-30-15-00-28-491.png

When I was using the adaptive scheduler and modified the task in parallel 
through the rest api, an incorrect decision logic occurred, causing the task to 
fail.
h2. produce:
When I start a simple job with a parallelism of 128, the Max Parallelism of the 
job will be set to 256 (through flink's internal calculation logic). Then I 
make a savepoint on the job and modify the parallelism of the job to 1. Restore 
the job from the savepoint. At this time, the Max Parallelism of the job is 
still 256:
 
!image-2024-07-30-14-56-48-931.png!

 
this is as expected, at this time I call the rest api to increase the 
parallelism to 129 (which is obviously reasonable, since it is < 128), but the 
task throws an exception after restarting:
 
!image-2024-07-30-14-59-26-976.png!
At this time, when viewing the detailed information of the task, it is found 
that Max Parallelism has changed to 128:
 
!image-2024-07-30-15-00-28-491.png!

 
This can be reproduced stably locally
 
h3. Causes:
 
In AdaptiveScheduler we recalculate the job `VertexParallelismStore`,
This results in the job after restart having the wrong max parallelism.

, which seems to be related to FLINK-21844 and FLINK-22084 .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35926) During rescale, AdaptiveScheduler has incorrect judgment logic for the max parallelism.

2024-07-30 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-35926:
---
Summary: During rescale, AdaptiveScheduler has incorrect judgment logic for 
the max parallelism.  (was: During rescale, jobmanager has incorrect judgment 
logic for the max parallelism.)

> During rescale, AdaptiveScheduler has incorrect judgment logic for the max 
> parallelism.
> ---
>
> Key: FLINK-35926
> URL: https://issues.apache.org/jira/browse/FLINK-35926
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.19.1
> Environment: flink-1.19.1
> There is a high probability that 1.18 has the same problem
>Reporter: yuanfenghu
>Priority: Blocker
> Attachments: image-2024-07-30-14-56-48-931.png, 
> image-2024-07-30-14-59-26-976.png, image-2024-07-30-15-00-28-491.png
>
>
> When I was using the adaptive scheduler and modified the task in parallel 
> through the rest api, an incorrect decision logic occurred, causing the task 
> to fail.
> h2. produce:
> When I start a simple job with a parallelism of 128, the Max Parallelism of 
> the job will be set to 256 (through flink's internal calculation logic). Then 
> I make a savepoint on the job and modify the parallelism of the job to 1. 
> Restore the job from the savepoint. At this time, the Max Parallelism of the 
> job is still 256:
>  
> !image-2024-07-30-14-56-48-931.png!
>  
> this is as expected, at this time I call the rest api to increase the 
> parallelism to 129 (which is obviously reasonable, since it is < 128), but 
> the task throws an exception after restarting:
>  
> !image-2024-07-30-14-59-26-976.png!
> At this time, when viewing the detailed information of the task, it is found 
> that Max Parallelism has changed to 128:
>  
> !image-2024-07-30-15-00-28-491.png!
>  
> This can be reproduced stably locally
>  
> h3. Causes:
>  
> In AdaptiveScheduler we recalculate the job `VertexParallelismStore`,
> This results in the job after restart having the wrong max parallelism.
> , which seems to be related to FLINK-21844 and FLINK-22084 .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35926) During rescale, AdaptiveScheduler has incorrect judgment logic for the max parallelism.

2024-07-30 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-35926:
---
Description: 
When I was using the adaptive scheduler and modified the task in parallel 
through the rest api, an incorrect decision logic occurred, causing the task to 
fail.
h2. produce:

When I start a simple job with a parallelism of 128, the Max Parallelism of the 
job will be set to 256 (through flink's internal calculation logic). Then I 
make a savepoint on the job and modify the parallelism of the job to 1. Restore 
the job from the savepoint. At this time, the Max Parallelism of the job is 
still 256:
 
!image-2024-07-30-14-56-48-931.png!

 
this is as expected, at this time I call the rest api to increase the 
parallelism to 129 (which is obviously reasonable, since it is < 256), but the 
task throws an exception after restarting:
 
!image-2024-07-30-14-59-26-976.png!
At this time, when viewing the detailed information of the task, it is found 
that Max Parallelism has changed to 128:
 
!image-2024-07-30-15-00-28-491.png!

 
This can be reproduced stably locally
 
h3. Causes:

 
In AdaptiveScheduler we recalculate the job `VertexParallelismStore`,
This results in the job after restart having the wrong max parallelism.

, which seems to be related to FLINK-21844 and FLINK-22084 .

  was:
When I was using the adaptive scheduler and modified the task in parallel 
through the rest api, an incorrect decision logic occurred, causing the task to 
fail.
h2. produce:
When I start a simple job with a parallelism of 128, the Max Parallelism of the 
job will be set to 256 (through flink's internal calculation logic). Then I 
make a savepoint on the job and modify the parallelism of the job to 1. Restore 
the job from the savepoint. At this time, the Max Parallelism of the job is 
still 256:
 
!image-2024-07-30-14-56-48-931.png!

 
this is as expected, at this time I call the rest api to increase the 
parallelism to 129 (which is obviously reasonable, since it is < 128), but the 
task throws an exception after restarting:
 
!image-2024-07-30-14-59-26-976.png!
At this time, when viewing the detailed information of the task, it is found 
that Max Parallelism has changed to 128:
 
!image-2024-07-30-15-00-28-491.png!

 
This can be reproduced stably locally
 
h3. Causes:
 
In AdaptiveScheduler we recalculate the job `VertexParallelismStore`,
This results in the job after restart having the wrong max parallelism.

, which seems to be related to FLINK-21844 and FLINK-22084 .


> During rescale, AdaptiveScheduler has incorrect judgment logic for the max 
> parallelism.
> ---
>
> Key: FLINK-35926
> URL: https://issues.apache.org/jira/browse/FLINK-35926
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.19.1
> Environment: flink-1.19.1
> There is a high probability that 1.18 has the same problem
>Reporter: yuanfenghu
>Priority: Blocker
> Attachments: image-2024-07-30-14-56-48-931.png, 
> image-2024-07-30-14-59-26-976.png, image-2024-07-30-15-00-28-491.png
>
>
> When I was using the adaptive scheduler and modified the task in parallel 
> through the rest api, an incorrect decision logic occurred, causing the task 
> to fail.
> h2. produce:
> When I start a simple job with a parallelism of 128, the Max Parallelism of 
> the job will be set to 256 (through flink's internal calculation logic). Then 
> I make a savepoint on the job and modify the parallelism of the job to 1. 
> Restore the job from the savepoint. At this time, the Max Parallelism of the 
> job is still 256:
>  
> !image-2024-07-30-14-56-48-931.png!
>  
> this is as expected, at this time I call the rest api to increase the 
> parallelism to 129 (which is obviously reasonable, since it is < 256), but 
> the task throws an exception after restarting:
>  
> !image-2024-07-30-14-59-26-976.png!
> At this time, when viewing the detailed information of the task, it is found 
> that Max Parallelism has changed to 128:
>  
> !image-2024-07-30-15-00-28-491.png!
>  
> This can be reproduced stably locally
>  
> h3. Causes:
>  
> In AdaptiveScheduler we recalculate the job `VertexParallelismStore`,
> This results in the job after restart having the wrong max parallelism.
> , which seems to be related to FLINK-21844 and FLINK-22084 .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36018) Support lazy scale down to avoid frequent rescaling

2024-08-09 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872207#comment-17872207
 ] 

yuanfenghu commented on FLINK-36018:


[~fanrui] , It seems like a good change, This is helpful for frequent scaling 
scenarios caused by reduced traffic, but I have a question, why not call it 
`scale-down.grace-period`, because we already have a similar parameter 
`scale-up.grace-period`
 

> Support lazy scale down to avoid frequent rescaling
> ---
>
> Key: FLINK-36018
> URL: https://issues.apache.org/jira/browse/FLINK-36018
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>
> h1. Background & Motivation
> We enabled autoscaler scaling for a few flink production jobs. It works with 
> Adaptive Scheduler and Rescale api.
> Scaling results:
>  * The recommended parallelism meets expectations most of the time
>  * When the source traffic increases, the autoscaler scales up the job in 
> time to prevent lags.
>  * When the source traffic decreases, the autoscaler scales down job in time 
> to save resources
>  * {color:#de350b}*Pain point:*{color} Each job rescales more than 20 times a 
> day (job.autoscaler.metrics.window=15 min by default). 
> As we all know, the job will be unavailable for a while during the restart 
> for some reasons:
>  * Cancel job
>  * Request resources( 
> [FLIP-472|https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states]
>  is optimizing it)
>  * Initialize task
>  * Restore state
>  * Catch up lag during restart
>  * etc
> *{color:#de350b}Expectations:{color}*
>  * Scaling up in time to prevent lags.
>  * Lazy scaling down to reduce downtime and ensure resources can be released 
> later.
> h1. Solution:
> Introduce job.autoscaler.scale-down.lazy-period, the default value could be 
> 30 min.
> Detailed strategies:
>  * Record the start time of the first scale-down event for each vertex 
> separately. For example:
>  ** vertex1: 2024-08-09 01:35:02
>  ** vertex2: 2024-08-09 01:38:02
>  * Scaling down will be triggered for some cases:
>  ** Any vertex needs scale up
>  *** Job restart cannot be avoided, so trigger scale down for another vertex 
> as well if needed
>  *** After scale down, clean up the start time of scale-down.
>  ** The scale down lazy period for any vertex is coming
>  *** current time - min(start time for each vertex) > scale-down.lazy-period
>  *** This means that there is no scaling up during the scaling down lazy 
> period
> Note1: If the recommend parallelism >= current parallelism, the start time of 
> scale-down will be cleaned up for current vertex.
> Note2: The recommended parallelism still comes from the latest 15-minute 
> metrics.For example:
>  * The current parallelism of vertex1 is 100, the traffic is decreased at 
> night.
>  * 2024-08-09 01:00:00, the recommended parallelism is 60.
>  ** The start time of scale down is 2024-08-09 01:00:00.
>  * 2024-08-09 01:15:00, the recommended parallelism is 50.
>  ** Still within the range of scale down lazy period.
>  ** Don't update the start time of scale down.
>  * 2024-08-09 01:31:00, the recommended parallelism is 40.
>  ** Outside of scale-down.lazy-period, trigger rescale, and use 40 as the 
> recommended parallelism.
>  ** The job.autoscaler.metrics.window is 15 min, so metrics from 2024-08-09 
> 01:16:00 to 2024-08-09 01:31:00



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36022) When scaling.enabled =false, adjust the values ​​​​of some parameters to provide bette recommendation values.

2024-08-09 Thread yuanfenghu (Jira)
yuanfenghu created FLINK-36022:
--

 Summary: When scaling.enabled =false, adjust the values of 
some parameters to provide bette recommendation values.
 Key: FLINK-36022
 URL: https://issues.apache.org/jira/browse/FLINK-36022
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler
Reporter: yuanfenghu


h1. Background

We have enabled AUTOSCALER in some scenarios, but we have not enabled 
job.autoscaler.scaling.enabled because we only want to use AUTOSCALER to obtain 
resource recommendations for tasks, but some parameters can cause these 
recommendations to be inaccurate.

example :
 * job.autoscaler.scale-down.max-factor

If set to 0.5, it means that the vertex can be reduced to up to 50% of the 
original value during scaling. If we do not turn on the 
job.autoscaler.scaling.enabled parameter, then the recommended value here will 
only be 100 for a vertex with 200 parallelism. But in fact, this may only 
require 50 or even lower resources during low periods.
 * job.autoscaler.restart.time

This parameter will cause the restart event to be used to calculate the 
resources required to chase data during expansion, resulting in the recommended 
resources being too large. However, if job.autoscaler.scaling.enabled =false, 
the restart time will be 0
 
h1. Solution:

 
When job.autoscaler.scaling.enabled = false, actively modify the above 
parameters job.autoscaler.scale-down.max-factor=1, job.autoscaler.restart.time=0
 
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36022) When scaling.enabled =false, adjust the values ​​​​of some parameters to provide bette recommendation values.

2024-08-09 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872211#comment-17872211
 ] 

yuanfenghu commented on FLINK-36022:


[~gyfora] [~mxm] [~fanrui] , 
Can express opinion about this jira? Thanks.

> When scaling.enabled =false, adjust the values of some parameters to 
> provide bette recommendation values.
> -
>
> Key: FLINK-36022
> URL: https://issues.apache.org/jira/browse/FLINK-36022
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yuanfenghu
>Priority: Major
>
> h1. Background
> We have enabled AUTOSCALER in some scenarios, but we have not enabled 
> job.autoscaler.scaling.enabled because we only want to use AUTOSCALER to 
> obtain resource recommendations for tasks, but some parameters can cause 
> these recommendations to be inaccurate.
> example :
>  * job.autoscaler.scale-down.max-factor
> If set to 0.5, it means that the vertex can be reduced to up to 50% of the 
> original value during scaling. If we do not turn on the 
> job.autoscaler.scaling.enabled parameter, then the recommended value here 
> will only be 100 for a vertex with 200 parallelism. But in fact, this may 
> only require 50 or even lower resources during low periods.
>  * job.autoscaler.restart.time
> This parameter will cause the restart event to be used to calculate the 
> resources required to chase data during expansion, resulting in the 
> recommended resources being too large. However, if 
> job.autoscaler.scaling.enabled =false, the restart time will be 0
>  
> h1. Solution:
>  
> When job.autoscaler.scaling.enabled = false, actively modify the above 
> parameters job.autoscaler.scale-down.max-factor=1, 
> job.autoscaler.restart.time=0
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36192) Optimize the logic to make it the common divisor of the partition number of the data source when determining the parallelism of the source task.

2024-09-01 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878458#comment-17878458
 ] 

yuanfenghu commented on FLINK-36192:


[~gyfora] 
hi, can you take a look at this question, if possible I would like to take this 
part of the job

CC [~fanrui] 
 

> Optimize the logic to make it the common divisor of the partition number of 
> the data source when determining the parallelism of the source task.
> 
>
> Key: FLINK-36192
> URL: https://issues.apache.org/jira/browse/FLINK-36192
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yuanfenghu
>Priority: Minor
>
> *Description:*
> We hope that when we know the number of partitions of Kafka data, we can try 
> our best to make the parallelism of tasks that consume Kafka equal to the 
> common divisor of the partitions, so that the tasks that are consumed can be 
> balanced.
>  
> {*}current logic{*}:
> Currently, the parallelism of tasks in the autoscaler is determined as 
> follows:
> step1: Calculate the processing rate of the task target and the corresponding 
> parallelism p1
> step2: Use the currently calculated degree of parallelism and the maximum 
> degree of parallelism of the operator to calculate, and take out the greatest 
> common divisor p2 of the maximum degree of parallelism / 2. If p2 < 
> maxparalleliem / 2, use p2 as the final degree of parallelism. If p2 > 
> maxparalleliem / 2 then use p1 as the final parallelism
> If the task that needs to be judged is a task that consumes Kafka or Pulsar, 
> the maximum parallelism of the task will be determined first: if the number 
> of partitions < the maximum parallelism of the current task, then the maximum 
> parallelism of the current task is the number of partitions of Kafka or 
> Pulsar. , otherwise the maximum degree of parallelism remains unchanged, so 
> there are the following situations:
> When the number of partitions in kafka or pulsar is less than the maximum 
> parallelism of the operator
> 1. If the parallelism calculated in step 1  partitions/2, then the demand is met and the number of tasks can be balanced.
> 2. If the parallelism calculated in step 1 > the number of kafka or pulsar 
> partitions / 2, use the parallelism calculated in step 1. At this time, the 
> consumption will become unbalanced. For example, the number of partitions in 
> kafka is 64, and the expected parallelism calculated in step 1 is If the 
> degree is 48, the final task parallelism degree is 48
> When the number of partitions in kafka or pulsar is greater than the maximum 
> parallelism of the operator
> Calculate the parallelism completely according to the logic of step 1. For 
> example, the parallelism of one of my kafka partitions is 200, and the 
> maximum parallelism of the operator is 128. Then the calculated parallelism 
> is 2, 4, 8, 16... It is very likely that Kafka cannot be consumed evenly
>  
> {*}expect logic{*}:
>  * When the number of partitions is less than the maximum parallelism, 
> determine the number of parallelism of the task as the common divisor of the 
> number of partitions.
>  * When the number of partitions is greater than the maximum parallelism, the 
> number of parallelism of the task is determined to be the common divisor of 
> the number of partitions but does not exceed the maximum parallelism.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36192) Optimize the logic to make it the common divisor of the partition number of the data source when determining the parallelism of the source task.

2024-09-01 Thread yuanfenghu (Jira)
yuanfenghu created FLINK-36192:
--

 Summary: Optimize the logic to make it the common divisor of the 
partition number of the data source when determining the parallelism of the 
source task.
 Key: FLINK-36192
 URL: https://issues.apache.org/jira/browse/FLINK-36192
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler
Reporter: yuanfenghu


*Description:*
We hope that when we know the number of partitions of Kafka data, we can try 
our best to make the parallelism of tasks that consume Kafka equal to the 
common divisor of the partitions, so that the tasks that are consumed can be 
balanced.
 
{*}current logic{*}:

Currently, the parallelism of tasks in the autoscaler is determined as follows:

step1: Calculate the processing rate of the task target and the corresponding 
parallelism p1

step2: Use the currently calculated degree of parallelism and the maximum 
degree of parallelism of the operator to calculate, and take out the greatest 
common divisor p2 of the maximum degree of parallelism / 2. If p2 < 
maxparalleliem / 2, use p2 as the final degree of parallelism. If p2 > 
maxparalleliem / 2 then use p1 as the final parallelism

If the task that needs to be judged is a task that consumes Kafka or Pulsar, 
the maximum parallelism of the task will be determined first: if the number of 
partitions < the maximum parallelism of the current task, then the maximum 
parallelism of the current task is the number of partitions of Kafka or Pulsar. 
, otherwise the maximum degree of parallelism remains unchanged, so there are 
the following situations:

When the number of partitions in kafka or pulsar is less than the maximum 
parallelism of the operator

1. If the parallelism calculated in step 1  the number of kafka or pulsar 
partitions / 2, use the parallelism calculated in step 1. At this time, the 
consumption will become unbalanced. For example, the number of partitions in 
kafka is 64, and the expected parallelism calculated in step 1 is If the degree 
is 48, the final task parallelism degree is 48

When the number of partitions in kafka or pulsar is greater than the maximum 
parallelism of the operator

Calculate the parallelism completely according to the logic of step 1. For 
example, the parallelism of one of my kafka partitions is 200, and the maximum 
parallelism of the operator is 128. Then the calculated parallelism is 2, 4, 8, 
16... It is very likely that Kafka cannot be consumed evenly

 
{*}expect logic{*}:
 * When the number of partitions is less than the maximum parallelism, 
determine the number of parallelism of the task as the common divisor of the 
number of partitions.
 * When the number of partitions is greater than the maximum parallelism, the 
number of parallelism of the task is determined to be the common divisor of the 
number of partitions but does not exceed the maximum parallelism.
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36192) Optimize the logic to make it the common divisor of the partition number of the data source when determining the parallelism of the source task.

2024-09-02 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878679#comment-17878679
 ] 

yuanfenghu commented on FLINK-36192:


> To fix the issue, we may have to add an explicit metric for the number of 
> source partitions. I don't really see other options because we need both the 
> maximum parallelism and the number of source partitions to perform the key 
> alignment. We can't lower the maximum parallelism like we have previously 
> done when we want to support use cases where the number of partitions exceeds 
> the maximum parallelism, and we also need to ensure the maximum parallelism 
> is preserved to prevent exceeding it.

+1 , 
We need to add a partition indicator to the source to later determine the 
degree of parallelism (key alignment, partition alignment)
Among them, key alignment can continue to use the previous logic ,See  
[alignemnt logic| 
https://github.com/apache/flink-kubernetes-operator/blob/d8568ae28b13f5cc649a83f174dbc88449f0c602/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L299]
 
For partition alignment, they may not be the same as key alignment.

When the degree of parallelism calculated in the autoscaler logic P1 > 
partition/2, there will be two situations:

1. When the number of partitions is less than max paralleliem, use the number 
of partitions as the degree of parallelism (I think this is optimal)

2. When the number of partitions is greater than max paralleliem, we have three 
ways to select the final degree of parallelism. {color:#de350b}Select P1 as the 
final degree of parallelism{color}, {color:#4c9aff}select partition/2 as the 
final degree of parallelism (this ensures that the partitions are 
aligned),{color} {color:#ffbdad}and max paralleliem is the final degree of 
parallelism.{color}
I'm not sure which of these three methods is better. In my opinion, I'm more 
inclined to try the second one.({color:#4c9aff}partition/2 as the final degree 
of parallelism){color}

> Optimize the logic to make it the common divisor of the partition number of 
> the data source when determining the parallelism of the source task.
> 
>
> Key: FLINK-36192
> URL: https://issues.apache.org/jira/browse/FLINK-36192
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yuanfenghu
>Priority: Minor
>
> *Description:*
> We hope that when we know the number of partitions of Kafka data, we can try 
> our best to make the parallelism of tasks that consume Kafka equal to the 
> common divisor of the partitions, so that the tasks that are consumed can be 
> balanced.
>  
> {*}current logic{*}:
> Currently, the parallelism of tasks in the autoscaler is determined as 
> follows:
> step1: Calculate the processing rate of the task target and the corresponding 
> parallelism p1
> step2: Use the currently calculated degree of parallelism and the maximum 
> degree of parallelism of the operator to calculate, and take out the greatest 
> common divisor p2 of the maximum degree of parallelism / 2. If p2 < 
> maxparalleliem / 2, use p2 as the final degree of parallelism. If p2 > 
> maxparalleliem / 2 then use p1 as the final parallelism
> If the task that needs to be judged is a task that consumes Kafka or Pulsar, 
> the maximum parallelism of the task will be determined first: if the number 
> of partitions < the maximum parallelism of the current task, then the maximum 
> parallelism of the current task is the number of partitions of Kafka or 
> Pulsar. , otherwise the maximum degree of parallelism remains unchanged, so 
> there are the following situations:
> When the number of partitions in kafka or pulsar is less than the maximum 
> parallelism of the operator
> 1. If the parallelism calculated in step 1  partitions/2, then the demand is met and the number of tasks can be balanced.
> 2. If the parallelism calculated in step 1 > the number of kafka or pulsar 
> partitions / 2, use the parallelism calculated in step 1. At this time, the 
> consumption will become unbalanced. For example, the number of partitions in 
> kafka is 64, and the expected parallelism calculated in step 1 is If the 
> degree is 48, the final task parallelism degree is 48
> When the number of partitions in kafka or pulsar is greater than the maximum 
> parallelism of the operator
> Calculate the parallelism completely according to the logic of step 1. For 
> example, the parallelism of one of my kafka partitions is 200, and the 
> maximum parallelism of the operator is 128. Then the calculated parallelism 
> is 2, 4, 8, 16... It is very likely that Kafka cannot be consumed evenly
>  
> {*}expect logic{*}:
>  * When the num

[jira] [Commented] (FLINK-36192) Optimize the logic to make it the common divisor of the partition number of the data source when determining the parallelism of the source task.

2024-09-02 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878715#comment-17878715
 ] 

yuanfenghu commented on FLINK-36192:


{quote}{color:#172b4d}In addition to the above discussion, when the recommended 
parallelism is limited due to source partition or maxParallelism causing the 
job to lag, could we record some events to remind the user?{color}
{quote}
+1 for this opinion.

 

{color:#172b4d}like  
{color}{*}eventType='{*}{color:#172b4d}{color:#172b4d}{color:#172b4d}{color:#172b4d}*Warning'*
   {*}reason = 'scaleUp{*}*Limited' message = 'currently unable to scale up due 
to the limit of the number of source partitions.'* ?
{color}{color}{color}{color} 

> Optimize the logic to make it the common divisor of the partition number of 
> the data source when determining the parallelism of the source task.
> 
>
> Key: FLINK-36192
> URL: https://issues.apache.org/jira/browse/FLINK-36192
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yuanfenghu
>Priority: Minor
>
> *Description:*
> We hope that when we know the number of partitions of Kafka data, we can try 
> our best to make the parallelism of tasks that consume Kafka equal to the 
> common divisor of the partitions, so that the tasks that are consumed can be 
> balanced.
>  
> {*}current logic{*}:
> Currently, the parallelism of tasks in the autoscaler is determined as 
> follows:
> step1: Calculate the processing rate of the task target and the corresponding 
> parallelism p1
> step2: Use the currently calculated degree of parallelism and the maximum 
> degree of parallelism of the operator to calculate, and take out the greatest 
> common divisor p2 of the maximum degree of parallelism / 2. If p2 < 
> maxparalleliem / 2, use p2 as the final degree of parallelism. If p2 > 
> maxparalleliem / 2 then use p1 as the final parallelism
> If the task that needs to be judged is a task that consumes Kafka or Pulsar, 
> the maximum parallelism of the task will be determined first: if the number 
> of partitions < the maximum parallelism of the current task, then the maximum 
> parallelism of the current task is the number of partitions of Kafka or 
> Pulsar. , otherwise the maximum degree of parallelism remains unchanged, so 
> there are the following situations:
> When the number of partitions in kafka or pulsar is less than the maximum 
> parallelism of the operator
> 1. If the parallelism calculated in step 1  partitions/2, then the demand is met and the number of tasks can be balanced.
> 2. If the parallelism calculated in step 1 > the number of kafka or pulsar 
> partitions / 2, use the parallelism calculated in step 1. At this time, the 
> consumption will become unbalanced. For example, the number of partitions in 
> kafka is 64, and the expected parallelism calculated in step 1 is If the 
> degree is 48, the final task parallelism degree is 48
> When the number of partitions in kafka or pulsar is greater than the maximum 
> parallelism of the operator
> Calculate the parallelism completely according to the logic of step 1. For 
> example, the parallelism of one of my kafka partitions is 200, and the 
> maximum parallelism of the operator is 128. Then the calculated parallelism 
> is 2, 4, 8, 16... It is very likely that Kafka cannot be consumed evenly
>  
> {*}expect logic{*}:
>  * When the number of partitions is less than the maximum parallelism, 
> determine the number of parallelism of the task as the common divisor of the 
> number of partitions.
>  * When the number of partitions is greater than the maximum parallelism, the 
> number of parallelism of the task is determined to be the common divisor of 
> the number of partitions but does not exceed the maximum parallelism.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36192) Optimize the logic to make it the common divisor of the partition number of the data source when determining the parallelism of the source task.

2024-09-03 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878747#comment-17878747
 ] 

yuanfenghu commented on FLINK-36192:


{quote}Overall LGTM, could we involve the vertex id in the message? It's useful 
to distinguish which source is limited when one job has multiple sources.
{quote}
How about this:

*vertex \{xxx}*  {*}unable to scale up due to the limit of the  ({*}{*}number 
of{*} {*}source partitions\ maxParallelism). expected p{*}{*}arallelism : xxx , 
actual (partition \{*} {*}maxParallelism{*}{*}): xxx{*}

> Optimize the logic to make it the common divisor of the partition number of 
> the data source when determining the parallelism of the source task.
> 
>
> Key: FLINK-36192
> URL: https://issues.apache.org/jira/browse/FLINK-36192
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yuanfenghu
>Priority: Minor
>
> *Description:*
> We hope that when we know the number of partitions of Kafka data, we can try 
> our best to make the parallelism of tasks that consume Kafka equal to the 
> common divisor of the partitions, so that the tasks that are consumed can be 
> balanced.
>  
> {*}current logic{*}:
> Currently, the parallelism of tasks in the autoscaler is determined as 
> follows:
> step1: Calculate the processing rate of the task target and the corresponding 
> parallelism p1
> step2: Use the currently calculated degree of parallelism and the maximum 
> degree of parallelism of the operator to calculate, and take out the greatest 
> common divisor p2 of the maximum degree of parallelism / 2. If p2 < 
> maxparalleliem / 2, use p2 as the final degree of parallelism. If p2 > 
> maxparalleliem / 2 then use p1 as the final parallelism
> If the task that needs to be judged is a task that consumes Kafka or Pulsar, 
> the maximum parallelism of the task will be determined first: if the number 
> of partitions < the maximum parallelism of the current task, then the maximum 
> parallelism of the current task is the number of partitions of Kafka or 
> Pulsar. , otherwise the maximum degree of parallelism remains unchanged, so 
> there are the following situations:
> When the number of partitions in kafka or pulsar is less than the maximum 
> parallelism of the operator
> 1. If the parallelism calculated in step 1  partitions/2, then the demand is met and the number of tasks can be balanced.
> 2. If the parallelism calculated in step 1 > the number of kafka or pulsar 
> partitions / 2, use the parallelism calculated in step 1. At this time, the 
> consumption will become unbalanced. For example, the number of partitions in 
> kafka is 64, and the expected parallelism calculated in step 1 is If the 
> degree is 48, the final task parallelism degree is 48
> When the number of partitions in kafka or pulsar is greater than the maximum 
> parallelism of the operator
> Calculate the parallelism completely according to the logic of step 1. For 
> example, the parallelism of one of my kafka partitions is 200, and the 
> maximum parallelism of the operator is 128. Then the calculated parallelism 
> is 2, 4, 8, 16... It is very likely that Kafka cannot be consumed evenly
>  
> {*}expect logic{*}:
>  * When the number of partitions is less than the maximum parallelism, 
> determine the number of parallelism of the task as the common divisor of the 
> number of partitions.
>  * When the number of partitions is greater than the maximum parallelism, the 
> number of parallelism of the task is determined to be the common divisor of 
> the number of partitions but does not exceed the maximum parallelism.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-36192) Optimize the logic to make it the common divisor of the partition number of the data source when determining the parallelism of the source task.

2024-09-03 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878747#comment-17878747
 ] 

yuanfenghu edited comment on FLINK-36192 at 9/3/24 7:18 AM:


{quote}Overall LGTM, could we involve the vertex id in the message? It's useful 
to distinguish which source is limited when one job has multiple sources.
{quote}
How about this:

*vertex \{xxx}*  {*}unable to scale up due to the limit of the  ({*}{*}number 
of{*} {*}source partitions\ maxParallelism). expected p{*}{*}arallelism : xxx , 
actual (partition or{*} {*}maxParallelism{*}{*}): xxx{*}


was (Author: JIRAUSER296932):
{quote}Overall LGTM, could we involve the vertex id in the message? It's useful 
to distinguish which source is limited when one job has multiple sources.
{quote}
How about this:

*vertex \{xxx}*  {*}unable to scale up due to the limit of the  ({*}{*}number 
of{*} {*}source partitions\ maxParallelism). expected p{*}{*}arallelism : xxx , 
actual (partition \{*} {*}maxParallelism{*}{*}): xxx{*}

> Optimize the logic to make it the common divisor of the partition number of 
> the data source when determining the parallelism of the source task.
> 
>
> Key: FLINK-36192
> URL: https://issues.apache.org/jira/browse/FLINK-36192
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yuanfenghu
>Priority: Minor
>
> *Description:*
> We hope that when we know the number of partitions of Kafka data, we can try 
> our best to make the parallelism of tasks that consume Kafka equal to the 
> common divisor of the partitions, so that the tasks that are consumed can be 
> balanced.
>  
> {*}current logic{*}:
> Currently, the parallelism of tasks in the autoscaler is determined as 
> follows:
> step1: Calculate the processing rate of the task target and the corresponding 
> parallelism p1
> step2: Use the currently calculated degree of parallelism and the maximum 
> degree of parallelism of the operator to calculate, and take out the greatest 
> common divisor p2 of the maximum degree of parallelism / 2. If p2 < 
> maxparalleliem / 2, use p2 as the final degree of parallelism. If p2 > 
> maxparalleliem / 2 then use p1 as the final parallelism
> If the task that needs to be judged is a task that consumes Kafka or Pulsar, 
> the maximum parallelism of the task will be determined first: if the number 
> of partitions < the maximum parallelism of the current task, then the maximum 
> parallelism of the current task is the number of partitions of Kafka or 
> Pulsar. , otherwise the maximum degree of parallelism remains unchanged, so 
> there are the following situations:
> When the number of partitions in kafka or pulsar is less than the maximum 
> parallelism of the operator
> 1. If the parallelism calculated in step 1  partitions/2, then the demand is met and the number of tasks can be balanced.
> 2. If the parallelism calculated in step 1 > the number of kafka or pulsar 
> partitions / 2, use the parallelism calculated in step 1. At this time, the 
> consumption will become unbalanced. For example, the number of partitions in 
> kafka is 64, and the expected parallelism calculated in step 1 is If the 
> degree is 48, the final task parallelism degree is 48
> When the number of partitions in kafka or pulsar is greater than the maximum 
> parallelism of the operator
> Calculate the parallelism completely according to the logic of step 1. For 
> example, the parallelism of one of my kafka partitions is 200, and the 
> maximum parallelism of the operator is 128. Then the calculated parallelism 
> is 2, 4, 8, 16... It is very likely that Kafka cannot be consumed evenly
>  
> {*}expect logic{*}:
>  * When the number of partitions is less than the maximum parallelism, 
> determine the number of parallelism of the task as the common divisor of the 
> number of partitions.
>  * When the number of partitions is greater than the maximum parallelism, the 
> number of parallelism of the task is determined to be the common divisor of 
> the number of partitions but does not exceed the maximum parallelism.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-36192) Optimize the logic to make it the common divisor of the partition number of the data source when determining the parallelism of the source task.

2024-09-03 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878747#comment-17878747
 ] 

yuanfenghu edited comment on FLINK-36192 at 9/3/24 7:18 AM:


{quote}Overall LGTM, could we involve the vertex id in the message? It's useful 
to distinguish which source is limited when one job has multiple sources.
{quote}
How about this:

*vertex \{xxx}*  {*}unable to scale up due to the limit of the  ({*}{*}number 
of{*} {*}source partitions\ maxParallelism). expected p{*}{*}arallelism : xxx , 
actual (partition \{*} {*}maxParallelism{*}{*}): xxx{*}


was (Author: JIRAUSER296932):
{quote}Overall LGTM, could we involve the vertex id in the message? It's useful 
to distinguish which source is limited when one job has multiple sources.
{quote}
How about this:

*vertex \{xxx}*  {*}unable to scale up due to the limit of the  ({*}{*}number 
of{*} {*}source partitions\ maxParallelism). expected p{*}{*}arallelism : xxx , 
actual (partition \{*} {*}maxParallelism{*}{*}): xxx{*}

> Optimize the logic to make it the common divisor of the partition number of 
> the data source when determining the parallelism of the source task.
> 
>
> Key: FLINK-36192
> URL: https://issues.apache.org/jira/browse/FLINK-36192
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yuanfenghu
>Priority: Minor
>
> *Description:*
> We hope that when we know the number of partitions of Kafka data, we can try 
> our best to make the parallelism of tasks that consume Kafka equal to the 
> common divisor of the partitions, so that the tasks that are consumed can be 
> balanced.
>  
> {*}current logic{*}:
> Currently, the parallelism of tasks in the autoscaler is determined as 
> follows:
> step1: Calculate the processing rate of the task target and the corresponding 
> parallelism p1
> step2: Use the currently calculated degree of parallelism and the maximum 
> degree of parallelism of the operator to calculate, and take out the greatest 
> common divisor p2 of the maximum degree of parallelism / 2. If p2 < 
> maxparalleliem / 2, use p2 as the final degree of parallelism. If p2 > 
> maxparalleliem / 2 then use p1 as the final parallelism
> If the task that needs to be judged is a task that consumes Kafka or Pulsar, 
> the maximum parallelism of the task will be determined first: if the number 
> of partitions < the maximum parallelism of the current task, then the maximum 
> parallelism of the current task is the number of partitions of Kafka or 
> Pulsar. , otherwise the maximum degree of parallelism remains unchanged, so 
> there are the following situations:
> When the number of partitions in kafka or pulsar is less than the maximum 
> parallelism of the operator
> 1. If the parallelism calculated in step 1  partitions/2, then the demand is met and the number of tasks can be balanced.
> 2. If the parallelism calculated in step 1 > the number of kafka or pulsar 
> partitions / 2, use the parallelism calculated in step 1. At this time, the 
> consumption will become unbalanced. For example, the number of partitions in 
> kafka is 64, and the expected parallelism calculated in step 1 is If the 
> degree is 48, the final task parallelism degree is 48
> When the number of partitions in kafka or pulsar is greater than the maximum 
> parallelism of the operator
> Calculate the parallelism completely according to the logic of step 1. For 
> example, the parallelism of one of my kafka partitions is 200, and the 
> maximum parallelism of the operator is 128. Then the calculated parallelism 
> is 2, 4, 8, 16... It is very likely that Kafka cannot be consumed evenly
>  
> {*}expect logic{*}:
>  * When the number of partitions is less than the maximum parallelism, 
> determine the number of parallelism of the task as the common divisor of the 
> number of partitions.
>  * When the number of partitions is greater than the maximum parallelism, the 
> number of parallelism of the task is determined to be the common divisor of 
> the number of partitions but does not exceed the maximum parallelism.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35462) Introduce a parameter to control the chaining mode between a Flink SQL connector sink or source and its adjacent operator.

2024-05-27 Thread yuanfenghu (Jira)
yuanfenghu created FLINK-35462:
--

 Summary: Introduce a parameter to control the chaining mode 
between a Flink SQL connector sink or source and its adjacent operator.
 Key: FLINK-35462
 URL: https://issues.apache.org/jira/browse/FLINK-35462
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: yuanfenghu


When using the flinksql connector, we can set the parallelism of the sink. 
However, when we modify the parallelism of the sink to be the same as the 
global one, the global chaining strategy may cause the two operators to be 
linked together, which will make the task unable to proceed. For checkpoint or 
savepoint recovery, we should allow explicit specification of the chaining mode 
of the connector.
 
Related issues:

 https://github.com/apache/iceberg/issues/10371



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35462) Introduce a parameter to control the chaining mode between a Flink SQL connector sink or source and its adjacent operator.

2024-05-27 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-35462:
---
Description: 
When using the flinksql connector, we can set the parallelism of the sink. 
However, when we modify the parallelism of the sink to be the same as the 
global one, the global chaining strategy may cause the two operators to be 
linked together, For checkpoint or savepoint recovery, which will make the task 
unable to proceed.  we should allow explicit specification of the chaining mode 
of the connector.
 
Related issues:

[https://github.com/apache/iceberg/issues/10371]

  was:
When using the flinksql connector, we can set the parallelism of the sink. 
However, when we modify the parallelism of the sink to be the same as the 
global one, the global chaining strategy may cause the two operators to be 
linked together, which will make the task unable to proceed. For checkpoint or 
savepoint recovery, we should allow explicit specification of the chaining mode 
of the connector.
 
Related issues:

 https://github.com/apache/iceberg/issues/10371


> Introduce a parameter to control the chaining mode between a Flink SQL 
> connector sink or source and its adjacent operator.
> --
>
> Key: FLINK-35462
> URL: https://issues.apache.org/jira/browse/FLINK-35462
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: yuanfenghu
>Priority: Major
>
> When using the flinksql connector, we can set the parallelism of the sink. 
> However, when we modify the parallelism of the sink to be the same as the 
> global one, the global chaining strategy may cause the two operators to be 
> linked together, For checkpoint or savepoint recovery, which will make the 
> task unable to proceed.  we should allow explicit specification of the 
> chaining mode of the connector.
>  
> Related issues:
> [https://github.com/apache/iceberg/issues/10371]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35462) Introduce a parameter to control the chaining mode between a Flink SQL connector sink or source and its adjacent operator.

2024-05-27 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-35462:
---
Description: 
When using the flinksql connector, we can set the parallelism of the sink. 
However, when we modify the parallelism of the sink to be the same as the 
global one, the global chaining strategy may cause the two operators to be 
linked together, When use checkpoint or savepoint recovery, which will make the 
task unable to proceed.  we should allow explicit specification of the chaining 
mode of the connector.
 
Related issues:

[https://github.com/apache/iceberg/issues/10371]

  was:
When using the flinksql connector, we can set the parallelism of the sink. 
However, when we modify the parallelism of the sink to be the same as the 
global one, the global chaining strategy may cause the two operators to be 
linked together, For checkpoint or savepoint recovery, which will make the task 
unable to proceed.  we should allow explicit specification of the chaining mode 
of the connector.
 
Related issues:

[https://github.com/apache/iceberg/issues/10371]


> Introduce a parameter to control the chaining mode between a Flink SQL 
> connector sink or source and its adjacent operator.
> --
>
> Key: FLINK-35462
> URL: https://issues.apache.org/jira/browse/FLINK-35462
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: yuanfenghu
>Priority: Major
>
> When using the flinksql connector, we can set the parallelism of the sink. 
> However, when we modify the parallelism of the sink to be the same as the 
> global one, the global chaining strategy may cause the two operators to be 
> linked together, When use checkpoint or savepoint recovery, which will make 
> the task unable to proceed.  we should allow explicit specification of the 
> chaining mode of the connector.
>  
> Related issues:
> [https://github.com/apache/iceberg/issues/10371]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33087) FlinkSql unable to parse field annotation information

2023-09-14 Thread yuanfenghu (Jira)
yuanfenghu created FLINK-33087:
--

 Summary: FlinkSql unable to parse field annotation information
 Key: FLINK-33087
 URL: https://issues.apache.org/jira/browse/FLINK-33087
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.17.1
Reporter: yuanfenghu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33087) FlinkSql unable to parse field annotation information

2023-09-14 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-33087:
---
Description: 
如果使用flinksql建表,
{code:java}
CREATE TABLE `ods_kafka_stream_taf_report_lakehouse_pipeline_log_tes2t` (
    `id` STRING PRIMARY KEY NOT ENFORCED COMMENT 'test_comment', 
    `gender` STRING COMMENT 'test_comment', )
COMMENT 'test'
 WITH (
...
); {code}

> FlinkSql unable to parse field annotation information
> -
>
> Key: FLINK-33087
> URL: https://issues.apache.org/jira/browse/FLINK-33087
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: yuanfenghu
>Priority: Minor
>
> 如果使用flinksql建表,
> {code:java}
> CREATE TABLE `ods_kafka_stream_taf_report_lakehouse_pipeline_log_tes2t` (
>     `id` STRING PRIMARY KEY NOT ENFORCED COMMENT 'test_comment', 
>     `gender` STRING COMMENT 'test_comment', )
> COMMENT 'test'
>  WITH (
> ...
> ); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33087) FlinkSql unable to parse field annotation information

2023-09-14 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-33087:
---
Description: 
If use flinksql to create a table:
{code:java}
CREATE TABLE `test_table` (
    `id` STRING PRIMARY KEY NOT ENFORCED COMMENT 'test_comment', 
    `gender` STRING COMMENT 'test_comment', )
COMMENT 'test'
 WITH (
...
); {code}
我们并不能在拿到字段的注释信息

  was:
如果使用flinksql建表,
{code:java}
CREATE TABLE `ods_kafka_stream_taf_report_lakehouse_pipeline_log_tes2t` (
    `id` STRING PRIMARY KEY NOT ENFORCED COMMENT 'test_comment', 
    `gender` STRING COMMENT 'test_comment', )
COMMENT 'test'
 WITH (
...
); {code}


> FlinkSql unable to parse field annotation information
> -
>
> Key: FLINK-33087
> URL: https://issues.apache.org/jira/browse/FLINK-33087
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: yuanfenghu
>Priority: Minor
>
> If use flinksql to create a table:
> {code:java}
> CREATE TABLE `test_table` (
>     `id` STRING PRIMARY KEY NOT ENFORCED COMMENT 'test_comment', 
>     `gender` STRING COMMENT 'test_comment', )
> COMMENT 'test'
>  WITH (
> ...
> ); {code}
> 我们并不能在拿到字段的注释信息



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33087) FlinkSql unable to parse field annotation information

2023-09-14 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-33087:
---
Description: 
If use flinksql to create a table:
{code:java}
CREATE TABLE `test_table` (
    `id` STRING PRIMARY KEY NOT ENFORCED COMMENT 'test_comment', 
    `gender` STRING COMMENT 'test_comment', )
COMMENT 'test'
 WITH (
...
); {code}
We cannot get the annotation information of the column

org.apache.flink.table.planner.operations.SqlCreateTableConverter#createCatalogTable

并没有解析出字段的注释信息

  was:
If use flinksql to create a table:
{code:java}
CREATE TABLE `test_table` (
    `id` STRING PRIMARY KEY NOT ENFORCED COMMENT 'test_comment', 
    `gender` STRING COMMENT 'test_comment', )
COMMENT 'test'
 WITH (
...
); {code}
我们并不能在拿到字段的注释信息


> FlinkSql unable to parse field annotation information
> -
>
> Key: FLINK-33087
> URL: https://issues.apache.org/jira/browse/FLINK-33087
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: yuanfenghu
>Priority: Minor
>
> If use flinksql to create a table:
> {code:java}
> CREATE TABLE `test_table` (
>     `id` STRING PRIMARY KEY NOT ENFORCED COMMENT 'test_comment', 
>     `gender` STRING COMMENT 'test_comment', )
> COMMENT 'test'
>  WITH (
> ...
> ); {code}
> We cannot get the annotation information of the column
> org.apache.flink.table.planner.operations.SqlCreateTableConverter#createCatalogTable
> 并没有解析出字段的注释信息



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33087) FlinkSql unable to parse field annotation information

2023-09-14 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-33087:
---
Description: 
If use flinksql to create a table:
{code:java}
CREATE TABLE `test_table` (
    `id` STRING PRIMARY KEY NOT ENFORCED COMMENT 'test_comment', 
    `gender` STRING COMMENT 'test_comment', )
COMMENT 'test'
 WITH (
...
); {code}
We cannot get the annotation information of the column

org.apache.flink.table.planner.operations.SqlCreateTableConverter#createCatalogTable
The comment of the column is not parsed. 这和 FLIP-164 有关

  was:
If use flinksql to create a table:
{code:java}
CREATE TABLE `test_table` (
    `id` STRING PRIMARY KEY NOT ENFORCED COMMENT 'test_comment', 
    `gender` STRING COMMENT 'test_comment', )
COMMENT 'test'
 WITH (
...
); {code}
We cannot get the annotation information of the column

org.apache.flink.table.planner.operations.SqlCreateTableConverter#createCatalogTable

并没有解析出字段的注释信息


> FlinkSql unable to parse field annotation information
> -
>
> Key: FLINK-33087
> URL: https://issues.apache.org/jira/browse/FLINK-33087
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: yuanfenghu
>Priority: Minor
>
> If use flinksql to create a table:
> {code:java}
> CREATE TABLE `test_table` (
>     `id` STRING PRIMARY KEY NOT ENFORCED COMMENT 'test_comment', 
>     `gender` STRING COMMENT 'test_comment', )
> COMMENT 'test'
>  WITH (
> ...
> ); {code}
> We cannot get the annotation information of the column
> org.apache.flink.table.planner.operations.SqlCreateTableConverter#createCatalogTable
> The comment of the column is not parsed. 这和 FLIP-164 有关



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33087) FlinkSql unable to parse field annotation information

2023-09-14 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-33087:
---
Description: 
If use flinksql to create a table:
{code:java}
CREATE TABLE `test_table` (
    `id` STRING PRIMARY KEY NOT ENFORCED COMMENT 'test_comment', 
    `gender` STRING COMMENT 'test_comment', )
COMMENT 'test'
 WITH (
...
); {code}
We cannot get the comment  of the column

org.apache.flink.table.planner.operations.SqlCreateTableConverter#createCatalogTable
The comment of the column is not parsed. This is related to FLIP-164
 

  was:
If use flinksql to create a table:
{code:java}
CREATE TABLE `test_table` (
    `id` STRING PRIMARY KEY NOT ENFORCED COMMENT 'test_comment', 
    `gender` STRING COMMENT 'test_comment', )
COMMENT 'test'
 WITH (
...
); {code}
We cannot get the annotation information of the column

org.apache.flink.table.planner.operations.SqlCreateTableConverter#createCatalogTable
The comment of the column is not parsed. 这和 FLIP-164 有关


> FlinkSql unable to parse field annotation information
> -
>
> Key: FLINK-33087
> URL: https://issues.apache.org/jira/browse/FLINK-33087
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: yuanfenghu
>Priority: Minor
>
> If use flinksql to create a table:
> {code:java}
> CREATE TABLE `test_table` (
>     `id` STRING PRIMARY KEY NOT ENFORCED COMMENT 'test_comment', 
>     `gender` STRING COMMENT 'test_comment', )
> COMMENT 'test'
>  WITH (
> ...
> ); {code}
> We cannot get the comment  of the column
> org.apache.flink.table.planner.operations.SqlCreateTableConverter#createCatalogTable
> The comment of the column is not parsed. This is related to FLIP-164
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33087) FlinkSql unable to parse column comment information

2023-09-14 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-33087:
---
Summary: FlinkSql unable to parse column comment information  (was: 
FlinkSql unable to parse column annotation information)

> FlinkSql unable to parse column comment information
> ---
>
> Key: FLINK-33087
> URL: https://issues.apache.org/jira/browse/FLINK-33087
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: yuanfenghu
>Priority: Minor
>
> If use flinksql to create a table:
> {code:java}
> CREATE TABLE `test_table` (
>     `id` STRING PRIMARY KEY NOT ENFORCED COMMENT 'test_comment', 
>     `gender` STRING COMMENT 'test_comment', )
> COMMENT 'test'
>  WITH (
> ...
> ); {code}
> We cannot get the comment  of the column
> org.apache.flink.table.planner.operations.SqlCreateTableConverter#createCatalogTable
> The comment of the column is not parsed. This is related to FLIP-164
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33087) FlinkSql unable to parse column annotation information

2023-09-14 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-33087:
---
Summary: FlinkSql unable to parse column annotation information  (was: 
FlinkSql unable to parse field annotation information)

> FlinkSql unable to parse column annotation information
> --
>
> Key: FLINK-33087
> URL: https://issues.apache.org/jira/browse/FLINK-33087
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: yuanfenghu
>Priority: Minor
>
> If use flinksql to create a table:
> {code:java}
> CREATE TABLE `test_table` (
>     `id` STRING PRIMARY KEY NOT ENFORCED COMMENT 'test_comment', 
>     `gender` STRING COMMENT 'test_comment', )
> COMMENT 'test'
>  WITH (
> ...
> ); {code}
> We cannot get the comment  of the column
> org.apache.flink.table.planner.operations.SqlCreateTableConverter#createCatalogTable
> The comment of the column is not parsed. This is related to FLIP-164
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33087) FlinkSql unable to parse column comment

2023-09-14 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-33087:
---
Summary: FlinkSql unable to parse column comment  (was: FlinkSql unable to 
parse column comment information)

> FlinkSql unable to parse column comment
> ---
>
> Key: FLINK-33087
> URL: https://issues.apache.org/jira/browse/FLINK-33087
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: yuanfenghu
>Priority: Minor
>
> If use flinksql to create a table:
> {code:java}
> CREATE TABLE `test_table` (
>     `id` STRING PRIMARY KEY NOT ENFORCED COMMENT 'test_comment', 
>     `gender` STRING COMMENT 'test_comment', )
> COMMENT 'test'
>  WITH (
> ...
> ); {code}
> We cannot get the comment  of the column
> org.apache.flink.table.planner.operations.SqlCreateTableConverter#createCatalogTable
> The comment of the column is not parsed. This is related to FLIP-164
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33087) FlinkSql unable to parse column comment

2023-09-14 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu closed FLINK-33087.
--
Resolution: Fixed

> FlinkSql unable to parse column comment
> ---
>
> Key: FLINK-33087
> URL: https://issues.apache.org/jira/browse/FLINK-33087
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: yuanfenghu
>Priority: Minor
>
> If use flinksql to create a table:
> {code:java}
> CREATE TABLE `test_table` (
>     `id` STRING PRIMARY KEY NOT ENFORCED COMMENT 'test_comment', 
>     `gender` STRING COMMENT 'test_comment', )
> COMMENT 'test'
>  WITH (
> ...
> ); {code}
> We cannot get the comment  of the column
> org.apache.flink.table.planner.operations.SqlCreateTableConverter#createCatalogTable
> The comment of the column is not parsed. This is related to FLIP-164
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33087) FlinkSql unable to parse column comment

2023-09-14 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-33087:
---
Affects Version/s: 1.16.2
   (was: 1.17.1)

> FlinkSql unable to parse column comment
> ---
>
> Key: FLINK-33087
> URL: https://issues.apache.org/jira/browse/FLINK-33087
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.16.2
>Reporter: yuanfenghu
>Priority: Minor
>
> If use flinksql to create a table:
> {code:java}
> CREATE TABLE `test_table` (
>     `id` STRING PRIMARY KEY NOT ENFORCED COMMENT 'test_comment', 
>     `gender` STRING COMMENT 'test_comment', )
> COMMENT 'test'
>  WITH (
> ...
> ); {code}
> We cannot get the comment  of the column
> org.apache.flink.table.planner.operations.SqlCreateTableConverter#createCatalogTable
> The comment of the column is not parsed. This is related to FLIP-164
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25373) task manager can not free memory when jobs are finished

2023-09-25 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17769033#comment-17769033
 ] 

yuanfenghu commented on FLINK-25373:


我也遇到了同样的问题

> task manager can not free memory when jobs are finished
> ---
>
> Key: FLINK-25373
> URL: https://issues.apache.org/jira/browse/FLINK-25373
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.0
> Environment: flink 1.14.0
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2021-12-19-11-48-33-622.png, 
> image-2022-03-11-10-06-19-499.png
>
>
> I submit my Flinksql jobs to the Flink standalone cluster and what  out of my 
> expectation is that TaskManagers could not free memory when all jobs are 
> finished whether normally or not.
> And I found that there were many threads named like `
> flink-taskexecutor-io-thread-x` and their states were waiting on conditions.
> here's the detail of these threads:
>  
> "flink-taskexecutor-io-thread-31" Id=5386 WAITING on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
> at sun.misc.Unsafe.park(Native Method)
> - waiting on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> !image-2021-12-19-11-48-33-622.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-25373) task manager can not free memory when jobs are finished

2023-09-25 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17769033#comment-17769033
 ] 

yuanfenghu edited comment on FLINK-25373 at 9/26/23 6:58 AM:
-

I also encountered the same problem
 


was (Author: JIRAUSER296932):
我也遇到了同样的问题

> task manager can not free memory when jobs are finished
> ---
>
> Key: FLINK-25373
> URL: https://issues.apache.org/jira/browse/FLINK-25373
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.0
> Environment: flink 1.14.0
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2021-12-19-11-48-33-622.png, 
> image-2022-03-11-10-06-19-499.png
>
>
> I submit my Flinksql jobs to the Flink standalone cluster and what  out of my 
> expectation is that TaskManagers could not free memory when all jobs are 
> finished whether normally or not.
> And I found that there were many threads named like `
> flink-taskexecutor-io-thread-x` and their states were waiting on conditions.
> here's the detail of these threads:
>  
> "flink-taskexecutor-io-thread-31" Id=5386 WAITING on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
> at sun.misc.Unsafe.park(Native Method)
> - waiting on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> !image-2021-12-19-11-48-33-622.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-25373) task manager can not free memory when jobs are finished

2023-09-26 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17769033#comment-17769033
 ] 

yuanfenghu edited comment on FLINK-25373 at 9/26/23 7:01 AM:
-

I also encountered the same problem
Hi,  [~xtsong] 
Our tm runs in docker. This problem will be triggered after the task is stopped 
on the original cluster or the task encounters an abnormal restart, which will 
cause these containers to be killed.
 


was (Author: JIRAUSER296932):
I also encountered the same problem
 [~xtsong] 
Our tm runs in docker. This problem will be triggered after the task is stopped 
on the original cluster or the task encounters an abnormal restart, which will 
cause these containers to be killed.
 

> task manager can not free memory when jobs are finished
> ---
>
> Key: FLINK-25373
> URL: https://issues.apache.org/jira/browse/FLINK-25373
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.0
> Environment: flink 1.14.0
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2021-12-19-11-48-33-622.png, 
> image-2022-03-11-10-06-19-499.png
>
>
> I submit my Flinksql jobs to the Flink standalone cluster and what  out of my 
> expectation is that TaskManagers could not free memory when all jobs are 
> finished whether normally or not.
> And I found that there were many threads named like `
> flink-taskexecutor-io-thread-x` and their states were waiting on conditions.
> here's the detail of these threads:
>  
> "flink-taskexecutor-io-thread-31" Id=5386 WAITING on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
> at sun.misc.Unsafe.park(Native Method)
> - waiting on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> !image-2021-12-19-11-48-33-622.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-25373) task manager can not free memory when jobs are finished

2023-09-26 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17769033#comment-17769033
 ] 

yuanfenghu edited comment on FLINK-25373 at 9/26/23 7:01 AM:
-

I also encountered the same problem
 [~xtsong] 
Our tm runs in docker. This problem will be triggered after the task is stopped 
on the original cluster or the task encounters an abnormal restart, which will 
cause these containers to be killed.
 


was (Author: JIRAUSER296932):
I also encountered the same problem
 

> task manager can not free memory when jobs are finished
> ---
>
> Key: FLINK-25373
> URL: https://issues.apache.org/jira/browse/FLINK-25373
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.0
> Environment: flink 1.14.0
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2021-12-19-11-48-33-622.png, 
> image-2022-03-11-10-06-19-499.png
>
>
> I submit my Flinksql jobs to the Flink standalone cluster and what  out of my 
> expectation is that TaskManagers could not free memory when all jobs are 
> finished whether normally or not.
> And I found that there were many threads named like `
> flink-taskexecutor-io-thread-x` and their states were waiting on conditions.
> here's the detail of these threads:
>  
> "flink-taskexecutor-io-thread-31" Id=5386 WAITING on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
> at sun.misc.Unsafe.park(Native Method)
> - waiting on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> !image-2021-12-19-11-48-33-622.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-25373) task manager can not free memory when jobs are finished

2023-09-26 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17769033#comment-17769033
 ] 

yuanfenghu edited comment on FLINK-25373 at 9/26/23 7:04 AM:
-

I also encountered the same problem
Hi,  [~xtsong] 
Our tm runs in docker. This problem will be triggered after the task is stopped 
on the original cluster or the task encounters an abnormal restart, which will 
cause these containers to be killed.
I think this part is not heap memory, because I saw through jvm that my heap 
memory is only 5G but the top command shows more than 15G. 
 

 


was (Author: JIRAUSER296932):
I also encountered the same problem
Hi,  [~xtsong] 
Our tm runs in docker. This problem will be triggered after the task is stopped 
on the original cluster or the task encounters an abnormal restart, which will 
cause these containers to be killed.
 

> task manager can not free memory when jobs are finished
> ---
>
> Key: FLINK-25373
> URL: https://issues.apache.org/jira/browse/FLINK-25373
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.0
> Environment: flink 1.14.0
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2021-12-19-11-48-33-622.png, 
> image-2022-03-11-10-06-19-499.png
>
>
> I submit my Flinksql jobs to the Flink standalone cluster and what  out of my 
> expectation is that TaskManagers could not free memory when all jobs are 
> finished whether normally or not.
> And I found that there were many threads named like `
> flink-taskexecutor-io-thread-x` and their states were waiting on conditions.
> here's the detail of these threads:
>  
> "flink-taskexecutor-io-thread-31" Id=5386 WAITING on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
> at sun.misc.Unsafe.park(Native Method)
> - waiting on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> !image-2021-12-19-11-48-33-622.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-25373) task manager can not free memory when jobs are finished

2023-09-26 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17769033#comment-17769033
 ] 

yuanfenghu edited comment on FLINK-25373 at 9/26/23 7:48 AM:
-

I also encountered the same problem
Hi,  [~xtsong] 
Our tm runs in docker. This problem will be triggered after the task is stopped 
on the original cluster or the task encounters an abnormal restart, which will 
cause these containers to be killed.
I think this part is not heap memory, because I saw through jvm that my heap 
memory is only 5G but the top command shows more than 15G. 
I have 6 slots per tm, but there are more than 6 
org.apache.flink.runtime.memory.MemoryManager

!image-2023-09-26-15-48-25-221.png!

 


was (Author: JIRAUSER296932):
I also encountered the same problem
Hi,  [~xtsong] 
Our tm runs in docker. This problem will be triggered after the task is stopped 
on the original cluster or the task encounters an abnormal restart, which will 
cause these containers to be killed.
I think this part is not heap memory, because I saw through jvm that my heap 
memory is only 5G but the top command shows more than 15G. 
 

 

> task manager can not free memory when jobs are finished
> ---
>
> Key: FLINK-25373
> URL: https://issues.apache.org/jira/browse/FLINK-25373
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.0
> Environment: flink 1.14.0
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2021-12-19-11-48-33-622.png, 
> image-2022-03-11-10-06-19-499.png, image-2023-09-26-15-48-25-221.png
>
>
> I submit my Flinksql jobs to the Flink standalone cluster and what  out of my 
> expectation is that TaskManagers could not free memory when all jobs are 
> finished whether normally or not.
> And I found that there were many threads named like `
> flink-taskexecutor-io-thread-x` and their states were waiting on conditions.
> here's the detail of these threads:
>  
> "flink-taskexecutor-io-thread-31" Id=5386 WAITING on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
> at sun.misc.Unsafe.park(Native Method)
> - waiting on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> !image-2021-12-19-11-48-33-622.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-25373) task manager can not free memory when jobs are finished

2023-09-26 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17769033#comment-17769033
 ] 

yuanfenghu edited comment on FLINK-25373 at 9/26/23 8:56 AM:
-

I also encountered the same problem
Hi,  [~xtsong] 
Our tm runs in docker. This problem will be triggered after the task is stopped 
on the original cluster or the task encounters an abnormal restart, which will 
cause these containers to be killed.
I think this part is not heap memory, because I saw through jvm that my heap 
memory is only 5G but the top command shows more than 15G. 



 

 


was (Author: JIRAUSER296932):
I also encountered the same problem
Hi,  [~xtsong] 
Our tm runs in docker. This problem will be triggered after the task is stopped 
on the original cluster or the task encounters an abnormal restart, which will 
cause these containers to be killed.
I think this part is not heap memory, because I saw through jvm that my heap 
memory is only 5G but the top command shows more than 15G. 
I have 6 slots per tm, but there are more than 6 
org.apache.flink.runtime.memory.MemoryManager

!image-2023-09-26-15-48-25-221.png!

 

> task manager can not free memory when jobs are finished
> ---
>
> Key: FLINK-25373
> URL: https://issues.apache.org/jira/browse/FLINK-25373
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.0
> Environment: flink 1.14.0
>Reporter: Spongebob
>Priority: Major
> Attachments: image-2021-12-19-11-48-33-622.png, 
> image-2022-03-11-10-06-19-499.png, image-2023-09-26-15-48-25-221.png
>
>
> I submit my Flinksql jobs to the Flink standalone cluster and what  out of my 
> expectation is that TaskManagers could not free memory when all jobs are 
> finished whether normally or not.
> And I found that there were many threads named like `
> flink-taskexecutor-io-thread-x` and their states were waiting on conditions.
> here's the detail of these threads:
>  
> "flink-taskexecutor-io-thread-31" Id=5386 WAITING on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
> at sun.misc.Unsafe.park(Native Method)
> - waiting on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2da8b14c
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> !image-2021-12-19-11-48-33-622.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2023-06-27 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737623#comment-17737623
 ] 

yuanfenghu commented on FLINK-23190:


[~loyi] 我认为有些缺陷,我之前也做过类似的工作,比如

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-23190) Make task-slot allocation much more evenly

2023-06-27 Thread yuanfenghu (Jira)


[ https://issues.apache.org/jira/browse/FLINK-23190 ]


yuanfenghu deleted comment on FLINK-23190:


was (Author: JIRAUSER296932):
[~loyi] 我认为有些缺陷,我之前也做过类似的工作,比如

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2023-06-27 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737625#comment-17737625
 ] 

yuanfenghu commented on FLINK-23190:


loyi 

I think there are still some problems, my test environment is 5tm *5slot when 
my task is

5 -> rebalance-> 25, the first task will only be assigned to two TMs, I 
debugged and found that there are 25 shared slots, of which there are five 
groups (groupA) that occupy two tasks, and one task group (groupB) is 20, and 
the sequence of generated slotrequest is expected to be

AAAAA

But the actual sequence is:

ABABABABABB

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2023-06-27 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737632#comment-17737632
 ] 

yuanfenghu commented on FLINK-23190:


[~loyi]  I think there are other issues to consider
 # Flink will apply for the same Pipelined region together, so you should not 
only experiment with rebalance. Generally speaking, if there are multiple 
Pipelined regions in a pipeline, the allocateSlotsForVertices method will be 
called multiple times, so you also need to try to use rescale to generate 
multiple pipeline regions. [~Weijie Guo] [~guoweijie]  I see that you have 
completed this part of the code. I would like to ask if you have any opinions 
on this issue. I look forward to your feedback
 # How to ensure even distribution when there are multiple pipelines in a Flink

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-23190) Make task-slot allocation much more evenly

2023-06-27 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737625#comment-17737625
 ] 

yuanfenghu edited comment on FLINK-23190 at 6/27/23 12:32 PM:
--

[~loyi] 

I think there are still some problems, my test environment is 5tm *5slot when 
my task is

5 {-}> rebalance{-}> 25, the first task will only be assigned to two TMs, I 
debugged and found that there are 25 shared slots, of which there are five 
groups (groupA) that occupy two tasks, and one task group (groupB) is 20, and 
the sequence of generated slotrequest is expected to be

AAAAA

But the actual sequence is:

ABABABABABB


was (Author: JIRAUSER296932):
loyi 

I think there are still some problems, my test environment is 5tm *5slot when 
my task is

5 -> rebalance-> 25, the first task will only be assigned to two TMs, I 
debugged and found that there are 25 shared slots, of which there are five 
groups (groupA) that occupy two tasks, and one task group (groupB) is 20, and 
the sequence of generated slotrequest is expected to be

AAAAA

But the actual sequence is:

ABABABABABB

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2023-06-28 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738089#comment-17738089
 ] 

yuanfenghu commented on FLINK-23190:


[~loyi] 

no, i just use 

```
private Collection sortExecutionSlotSharingGroup(
Collection executionVertexGroups) {

Map, List> jobVertexGroups =
executionVertexGroups.stream()
 .collect(Collectors.groupingBy(this::getJobVertexSharingGroup));

List> groups = jobVertexGroups.entrySet()
.stream().sorted((g1, g2) -> Integer.compare(g2.getKey().size(), 
g1.getKey().size()))
.map(Map.Entry::getValue).collect(Collectors.toList());

List sorted = new ArrayList<>();int i = 
executionVertexGroups.size(), j = 0;while (i > 0) {
List group = groups.get((j++) % 
groups.size());if (group.isEmpty()) {continue;
}
i--;
sorted.add(group.remove(0));
}return sorted;
}
```
and the Flink version I use is 1.16, is this different from your experimental 
environment?
 

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-23190) Make task-slot allocation much more evenly

2023-06-28 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738089#comment-17738089
 ] 

yuanfenghu edited comment on FLINK-23190 at 6/28/23 12:15 PM:
--

[~loyi] 

no, i just use 

 

```
private Collection sortExecutionSlotSharingGroup(
Collection executionVertexGroups) {

Map, List> jobVertexGroups =
executionVertexGroups.stream()
.collect(Collectors.groupingBy(this::getJobVertexSharingGroup));

List> groups = jobVertexGroups.entrySet()
.stream().sorted((g1, g2) -> Integer.compare(g2.getKey().size(), 
g1.getKey().size()))
.map(Map.Entry::getValue).collect(Collectors.toList());

List sorted = new ArrayList<>(); int i = 
executionVertexGroups.size(), j = 0; while (i > 0) {
List group = groups.get((j++) % groups.size()); if 
(group.isEmpty())

{ continue; }

i--;
sorted.add(group.remove(0));
} return sorted;
}
```


and the Flink version I use is 1.16, is this different from your experimental 
environment?
 


was (Author: JIRAUSER296932):
[~loyi] 

no, i just use 

```
private Collection sortExecutionSlotSharingGroup(
Collection executionVertexGroups) {

Map, List> jobVertexGroups =
executionVertexGroups.stream()
 .collect(Collectors.groupingBy(this::getJobVertexSharingGroup));

List> groups = jobVertexGroups.entrySet()
.stream().sorted((g1, g2) -> Integer.compare(g2.getKey().size(), 
g1.getKey().size()))
.map(Map.Entry::getValue).collect(Collectors.toList());

List sorted = new ArrayList<>();int i = 
executionVertexGroups.size(), j = 0;while (i > 0) {
List group = groups.get((j++) % 
groups.size());if (group.isEmpty()) {continue;
}
i--;
sorted.add(group.remove(0));
}return sorted;
}
```
and the Flink version I use is 1.16, is this different from your experimental 
environment?
 

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with f

[jira] (FLINK-23190) Make task-slot allocation much more evenly

2023-06-28 Thread yuanfenghu (Jira)


[ https://issues.apache.org/jira/browse/FLINK-23190 ]


yuanfenghu deleted comment on FLINK-23190:


was (Author: JIRAUSER296932):
[~loyi] 

no, i just use 

 

```
private Collection sortExecutionSlotSharingGroup(
Collection executionVertexGroups) {

Map, List> jobVertexGroups =
executionVertexGroups.stream()
.collect(Collectors.groupingBy(this::getJobVertexSharingGroup));

List> groups = jobVertexGroups.entrySet()
.stream().sorted((g1, g2) -> Integer.compare(g2.getKey().size(), 
g1.getKey().size()))
.map(Map.Entry::getValue).collect(Collectors.toList());

List sorted = new ArrayList<>(); int i = 
executionVertexGroups.size(), j = 0; while (i > 0) {
List group = groups.get((j++) % groups.size()); if 
(group.isEmpty())

{ continue; }

i--;
sorted.add(group.remove(0));
} return sorted;
}
```


and the Flink version I use is 1.16, is this different from your experimental 
environment?
 

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2023-06-28 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738090#comment-17738090
 ] 

yuanfenghu commented on FLINK-23190:


[~loyi] 

no, i just use

```
private Collection sortExecutionSlotSharingGroup(
Collection executionVertexGroups) {

Map, List> jobVertexGroups =
executionVertexGroups.stream()
.collect(Collectors.groupingBy(this::getJobVertexSharingGroup));

List> groups = jobVertexGroups.entrySet()
.stream().sorted((g1, g2) -> Integer.compare(g2.getKey().size(), 
g1.getKey().size()))
.map(Map.Entry::getValue).collect(Collectors.toList());

List sorted = new ArrayList<>(); int i = 
executionVertexGroups.size(), j = 0; while (i > 0) {
List group = groups.get((j++) % groups.size()); if 
(group.isEmpty())

{ continue; }

i--;
sorted.add(group.remove(0));
} return sorted;
}

```

and the Flink version I use is 1.16.2 , is this different from your 
experimental environment?

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-23190) Make task-slot allocation much more evenly

2023-06-28 Thread yuanfenghu (Jira)


[ https://issues.apache.org/jira/browse/FLINK-23190 ]


yuanfenghu deleted comment on FLINK-23190:


was (Author: JIRAUSER296932):
[~loyi] 

no, i just use

```
private Collection sortExecutionSlotSharingGroup(
Collection executionVertexGroups) {

Map, List> jobVertexGroups =
executionVertexGroups.stream()
.collect(Collectors.groupingBy(this::getJobVertexSharingGroup));

List> groups = jobVertexGroups.entrySet()
.stream().sorted((g1, g2) -> Integer.compare(g2.getKey().size(), 
g1.getKey().size()))
.map(Map.Entry::getValue).collect(Collectors.toList());

List sorted = new ArrayList<>(); int i = 
executionVertexGroups.size(), j = 0; while (i > 0) {
List group = groups.get((j++) % groups.size()); if 
(group.isEmpty())

{ continue; }

i--;
sorted.add(group.remove(0));
} return sorted;
}

```

and the Flink version I use is 1.16.2 , is this different from your 
experimental environment?

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2023-06-28 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738092#comment-17738092
 ] 

yuanfenghu commented on FLINK-23190:


[~loyi] 
@Overridepublic List allocateSlotsFor(
List executionVertexIds) {// bla bla
Map> executionsByGroup =
executionVertexIds.stream()
.collect(
Collectors.groupingBy(

slotSharingStrategy::getExecutionSlotSharingGroup));

Map slots =// Here  we 
can control the ExecutionSlotSharingGroup allocation order instead of random 
order.// For example, a circle order of JobVertexGroup
sortExecutionSlotSharingGroup(executionsByGroup.keySet())
.stream().map(group -> getOrAllocateSharedSlot(group, 
sharedSlotProfileRetriever))
.collect(
Collectors.toMap(
SharedSlot::getExecutionSlotSharingGroup,
Function.identity())); // bla 
bla}private Collection sortExecutionSlotSharingGroup(
Collection executionVertexGroups) {

Map, List> jobVertexGroups =
executionVertexGroups.stream()
 .collect(Collectors.groupingBy(this::getJobVertexSharingGroup));

List> groups = jobVertexGroups.entrySet()
.stream().sorted((g1, g2) -> Integer.compare(g2.getKey().size(), 
g1.getKey().size()))
.map(Map.Entry::getValue).collect(Collectors.toList());

List sorted = new ArrayList<>();int i = 
executionVertexGroups.size(), j = 0;while (i > 0) {
List group = groups.get((j++) % 
groups.size());if (group.isEmpty()) {continue;
}
i--;
sorted.add(group.remove(0));
}return sorted;
}


Set getJobVertexSharingGroup(ExecutionSlotSharingGroup group) {
return group.getExecutionVertexIds().stream()
.map(ExecutionVertexID::getJobVertexId).collect(Collectors.toSet());
}
I just use the code ,

and the Flink version I use is 1.16.2 , is this different from your 
experimental environment?

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/

[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2023-06-28 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738094#comment-17738094
 ] 

yuanfenghu commented on FLINK-23190:


{code:java}
//代码占位符
@Overridepublic List allocateSlotsFor(
List executionVertexIds){ // bla bla 
Map> executionsByGroup = 
executionVertexIds.stream() .collect( Collectors.groupingBy( 
slotSharingStrategy::getExecutionSlotSharingGroup)); 
Map slots = // Here we can control the 
ExecutionSlotSharingGroup allocation order instead of random order. // For 
example, a circle order of JobVertexGroup 
sortExecutionSlotSharingGroup(executionsByGroup.keySet()) .stream().map(group 
-> getOrAllocateSharedSlot(group, sharedSlotProfileRetriever)) .collect( 
Collectors.toMap( SharedSlot::getExecutionSlotSharingGroup, 
Function.identity())); // bla bla}private Collection 
sortExecutionSlotSharingGroup(
Collection executionVertexGroups) 
{Map, List> jobVertexGroups =
executionVertexGroups.stream()
.collect(Collectors.groupingBy(this::getJobVertexSharingGroup));List>
 groups = jobVertexGroups.entrySet()
.stream().sorted((g1, g2) -> Integer.compare(g2.getKey().size(), 
g1.getKey().size()))
.map(Map.Entry::getValue).collect(Collectors.toList());List
 sorted = new ArrayList<>(); int i = executionVertexGroups.size(), j = 0; while 
(i > 0) {
List group = groups.get((j++) % groups.size()); if 
(group.isEmpty()){ continue; }i--;
sorted.add(group.remove(0));
} return sorted;
}Set getJobVertexSharingGroup(ExecutionSlotSharingGroup group){ 
return group.getExecutionVertexIds().stream() 
.map(ExecutionVertexID::getJobVertexId).collect(Collectors.toSet()); } {code}
[~loyi]  I just use the code ,and the Flink version I use is 1.16.2 , is this 
different from your experimental environment?

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-23190) Make task-slot allocation much more evenly

2023-06-28 Thread yuanfenghu (Jira)


[ https://issues.apache.org/jira/browse/FLINK-23190 ]


yuanfenghu deleted comment on FLINK-23190:


was (Author: JIRAUSER296932):
{code:java}
//代码占位符
@Overridepublic List allocateSlotsFor(
List executionVertexIds){ // bla bla 
Map> executionsByGroup = 
executionVertexIds.stream() .collect( Collectors.groupingBy( 
slotSharingStrategy::getExecutionSlotSharingGroup)); 
Map slots = // Here we can control the 
ExecutionSlotSharingGroup allocation order instead of random order. // For 
example, a circle order of JobVertexGroup 
sortExecutionSlotSharingGroup(executionsByGroup.keySet()) .stream().map(group 
-> getOrAllocateSharedSlot(group, sharedSlotProfileRetriever)) .collect( 
Collectors.toMap( SharedSlot::getExecutionSlotSharingGroup, 
Function.identity())); // bla bla}private Collection 
sortExecutionSlotSharingGroup(
Collection executionVertexGroups) 
{Map, List> jobVertexGroups =
executionVertexGroups.stream()
.collect(Collectors.groupingBy(this::getJobVertexSharingGroup));List>
 groups = jobVertexGroups.entrySet()
.stream().sorted((g1, g2) -> Integer.compare(g2.getKey().size(), 
g1.getKey().size()))
.map(Map.Entry::getValue).collect(Collectors.toList());List
 sorted = new ArrayList<>(); int i = executionVertexGroups.size(), j = 0; while 
(i > 0) {
List group = groups.get((j++) % groups.size()); if 
(group.isEmpty()){ continue; }i--;
sorted.add(group.remove(0));
} return sorted;
}Set getJobVertexSharingGroup(ExecutionSlotSharingGroup group){ 
return group.getExecutionVertexIds().stream() 
.map(ExecutionVertexID::getJobVertexId).collect(Collectors.toSet()); } {code}
[~loyi]  I just use the code ,and the Flink version I use is 1.16.2 , is this 
different from your experimental environment?

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-23190) Make task-slot allocation much more evenly

2023-06-28 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738092#comment-17738092
 ] 

yuanfenghu edited comment on FLINK-23190 at 6/28/23 12:21 PM:
--

[~loyi] 
{code:java}
//代码占位符
@Overridepublic List allocateSlotsFor(
List executionVertexIds)
{ // bla bla Map> 
executionsByGroup = executionVertexIds.stream() .collect( 
Collectors.groupingBy( slotSharingStrategy::getExecutionSlotSharingGroup)); 
Map slots = // Here we can control the 
ExecutionSlotSharingGroup allocation order instead of random order. // For 
example, a circle order of JobVertexGroup 
sortExecutionSlotSharingGroup(executionsByGroup.keySet()) .stream().map(group 
-> getOrAllocateSharedSlot(group, sharedSlotProfileRetriever)) .collect( 
Collectors.toMap( SharedSlot::getExecutionSlotSharingGroup, 
Function.identity())); // bla bla}
private Collection sortExecutionSlotSharingGroup(
Collection executionVertexGroups) {
Map, List> jobVertexGroups =
executionVertexGroups.stream()
.collect(Collectors.groupingBy(this::getJobVertexSharingGroup));
List> groups = jobVertexGroups.entrySet()
.stream().sorted((g1, g2) -> Integer.compare(g2.getKey().size(), 
g1.getKey().size()))
.map(Map.Entry::getValue).collect(Collectors.toList());
List sorted = new ArrayList<>(); int i = 
executionVertexGroups.size(), j = 0; while (i > 0) {
List group = groups.get((j++) % groups.size()); if 
(group.isEmpty())
{ continue; }
i--;
sorted.add(group.remove(0));
} return sorted;
}
Set getJobVertexSharingGroup(ExecutionSlotSharingGroup group)
{ return group.getExecutionVertexIds().stream() 
.map(ExecutionVertexID::getJobVertexId).collect(Collectors.toSet()); } {code}
I just use the code ,

and the Flink version I use is 1.16.2 , is this different from your 
experimental environment?


was (Author: JIRAUSER296932):
[~loyi] 
@Overridepublic List allocateSlotsFor(
List executionVertexIds) {// bla bla
Map> executionsByGroup =
executionVertexIds.stream()
.collect(
Collectors.groupingBy(

slotSharingStrategy::getExecutionSlotSharingGroup));

Map slots =// Here  we 
can control the ExecutionSlotSharingGroup allocation order instead of random 
order.// For example, a circle order of JobVertexGroup
sortExecutionSlotSharingGroup(executionsByGroup.keySet())
.stream().map(group -> getOrAllocateSharedSlot(group, 
sharedSlotProfileRetriever))
.collect(
Collectors.toMap(
SharedSlot::getExecutionSlotSharingGroup,
Function.identity())); // bla 
bla}private Collection sortExecutionSlotSharingGroup(
Collection executionVertexGroups) {

Map, List> jobVertexGroups =
executionVertexGroups.stream()
 .collect(Collectors.groupingBy(this::getJobVertexSharingGroup));

List> groups = jobVertexGroups.entrySet()
.stream().sorted((g1, g2) -> Integer.compare(g2.getKey().size(), 
g1.getKey().size()))
.map(Map.Entry::getValue).collect(Collectors.toList());

List sorted = new ArrayList<>();int i = 
executionVertexGroups.size(), j = 0;while (i > 0) {
List group = groups.get((j++) % 
groups.size());if (group.isEmpty()) {continue;
}
i--;
sorted.add(group.remove(0));
}return sorted;
}


Set getJobVertexSharingGroup(ExecutionSlotSharingGroup group) {
return group.getExecutionVertexIds().stream()
.map(ExecutionVertexID::getJobVertexId).collect(Collectors.toSet());
}
I just use the code ,

and the Flink version I use is 1.16.2 , is this different from your 
experimental environment?

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20

[jira] [Commented] (FLINK-31757) Optimize Flink un-balanced tasks scheduling

2023-07-27 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748386#comment-17748386
 ] 

yuanfenghu commented on FLINK-31757:


Is there any progress on this topic? We are also facing this problem now, which 
is quite a headache.
 

> Optimize Flink un-balanced tasks scheduling
> ---
>
> Key: FLINK-31757
> URL: https://issues.apache.org/jira/browse/FLINK-31757
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: RocMarshal
>Assignee: RocMarshal
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-04-13-08-04-04-667.png
>
>
> Supposed we have a Job with 21 {{JobVertex}}. The parallelism of vertex A is 
> 100, and the others are 5. If each {{TaskManager}} only have one slot, then 
> we need 100 TMs.
> There will be 5 slots with 21 sub-tasks, and the others will only have one 
> sub-task of A. Does this mean we have to make a trade-off between wasted 
> resources and insufficient resources?
> From a resource utilization point of view, we expect all subtasks to be 
> evenly distributed on each TM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30884) GetTable from Flink catalog should be judged whether it is a sink table

2023-02-02 Thread yuanfenghu (Jira)
yuanfenghu created FLINK-30884:
--

 Summary: GetTable from Flink catalog should be judged whether it 
is a sink table
 Key: FLINK-30884
 URL: https://issues.apache.org/jira/browse/FLINK-30884
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API, Table SQL / Planner
Affects Versions: 1.16.1
Reporter: yuanfenghu


I want to say that when I use a third-party persistent catalog to manage the 
metadata of my persistent table,

I may want to judge the options of the table I need to generate by whether it 
is a sink table

For example when using kafka connector

When I use kafka as source table,

The following parameters are required:

offset, properties.group.id, etc.

When I use kafka as the sink representation, I will pass in some parameters 
only about the sink table, for example:

sink.delivery-guarantee

sink.partitioner

So why can't we add a switch to tell the catalog this information, which is 
very useful in platform development! !

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30884) GetTable from Flink catalog should be judged whether it is a sink table

2023-02-02 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-30884:
---
Priority: Major  (was: Blocker)

> GetTable from Flink catalog should be judged whether it is a sink table
> ---
>
> Key: FLINK-30884
> URL: https://issues.apache.org/jira/browse/FLINK-30884
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.16.1
>Reporter: yuanfenghu
>Priority: Major
>
> I want to say that when I use a third-party persistent catalog to manage the 
> metadata of my persistent table,
> I may want to judge the options of the table I need to generate by whether it 
> is a sink table
> For example when using kafka connector
> When I use kafka as source table,
> The following parameters are required:
> offset, properties.group.id, etc.
> When I use kafka as the sink representation, I will pass in some parameters 
> only about the sink table, for example:
> sink.delivery-guarantee
> sink.partitioner
> So why can't we add a switch to tell the catalog this information, which is 
> very useful in platform development! !
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30884) GetTable from Flink catalog should be judged whether it is a sink table

2023-02-02 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17683386#comment-17683386
 ] 

yuanfenghu commented on FLINK-30884:


[~fsk119] [~godfrey]  

> GetTable from Flink catalog should be judged whether it is a sink table
> ---
>
> Key: FLINK-30884
> URL: https://issues.apache.org/jira/browse/FLINK-30884
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.16.1
>Reporter: yuanfenghu
>Priority: Blocker
>
> I want to say that when I use a third-party persistent catalog to manage the 
> metadata of my persistent table,
> I may want to judge the options of the table I need to generate by whether it 
> is a sink table
> For example when using kafka connector
> When I use kafka as source table,
> The following parameters are required:
> offset, properties.group.id, etc.
> When I use kafka as the sink representation, I will pass in some parameters 
> only about the sink table, for example:
> sink.delivery-guarantee
> sink.partitioner
> So why can't we add a switch to tell the catalog this information, which is 
> very useful in platform development! !
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30884) GetTable from Flink catalog should be judged whether it is a sink table

2023-02-03 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17684088#comment-17684088
 ] 

yuanfenghu commented on FLINK-30884:


In one case, we used an external catalog to manage my kafka tables, preserving 
information such as:

The topic format and columns and columnttypes

But the kafka table can actually be used to read or write. When it is used as a 
write table, I may add some other parameters such as:

sink.delivery-guarantee

sink.partitioner

The properties. The sasl. Jaas. Config authentication information, etc

As source I need to add these parameters:

scan.startup.mode

offset

The properties. The sasl. Jaas. Config authentication information, etc

I need to ask some external systems to generate this information. Therefore, I 
want to know whether gettable is a sink table so that I can determine how to 
generate these options

> GetTable from Flink catalog should be judged whether it is a sink table
> ---
>
> Key: FLINK-30884
> URL: https://issues.apache.org/jira/browse/FLINK-30884
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.16.1
>Reporter: yuanfenghu
>Priority: Major
>
> I want to say that when I use a third-party persistent catalog to manage the 
> metadata of my persistent table,
> I may want to judge the options of the table I need to generate by whether it 
> is a sink table
> For example when using kafka connector
> When I use kafka as source table,
> The following parameters are required:
> offset, properties.group.id, etc.
> When I use kafka as the sink representation, I will pass in some parameters 
> only about the sink table, for example:
> sink.delivery-guarantee
> sink.partitioner
> So why can't we add a switch to tell the catalog this information, which is 
> very useful in platform development! !
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-23190) Make task-slot allocation much more evenly

2023-06-24 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736826#comment-17736826
 ] 

yuanfenghu commented on FLINK-23190:


[~loyi]  Thank you. In our scenario, there will also be problems with uneven 
slot steps, especially after rebalance, all tasks will be considered to be in 
one region. I am willing to try your solution
 

> Make task-slot allocation much more evenly
> --
>
> Key: FLINK-23190
> URL: https://issues.apache.org/jira/browse/FLINK-23190
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.12.3, 1.13.1
>Reporter: loyi
>Assignee: loyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2021-07-16-10-34-30-700.png
>
>
> FLINK-12122 only guarantees spreading out tasks across the set of TMs which 
> are registered at the time of scheduling, but our jobs are all runing on 
> active yarn mode, the job with smaller source parallelism offen cause 
> load-balance issues. 
>  
> For this job:
> {code:java}
> //  -ys 4 means 10 taskmanagers
> env.addSource(...).name("A").setParallelism(10).
>  map(...).name("B").setParallelism(30)
>  .map(...).name("C").setParallelism(40)
>  .addSink(...).name("D").setParallelism(20);
> {code}
>  
>  Flink-1.12.3 task allocation: 
> ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
> |A| 
> 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
> |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
> |C|4|4|4|4|4|4|4|4|4|4|
> |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|
>  
> Suggestions:
> When TaskManger start register slots to slotManager , current processing 
> logic will choose  the first pendingSlot which meet its resource 
> requirements.  The "random" strategy usually causes uneven task allocation 
> when source-operator's parallelism is significantly below process-operator's. 
>   A simple feasible idea  is  {color:#de350b}partition{color} the current  
> "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as  let 
> AllocationID bring the detail)  , then allocate the slots proportionally to 
> each JobVertexGroup.
>  
> For above case, the 40 pendingSlots could be divided into 4 groups:
> [ABCD]: 10        // A、B、C、D reprents  {color:#de350b}jobVertexId{color}
> [BCD]: 10
> [CD]: 10
> [D]: 10
>  
> Every taskmanager will provide 4 slots one time, and each group will get 1 
> slot according their proportion (1/4), the final allocation result is below:
> [ABCD] : deploye on 10 different taskmangers
> [BCD]: deploye on 10 different taskmangers
> [CD]: deploye on 10  different taskmangers
> [D]: deploye on 10 different taskmangers
>  
> I have implement a [concept 
> code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1]
>   based on Flink-1.12.3 ,  the patch version has {color:#de350b}fully 
> evenly{color} task allocation , and works well on my workload .  Are there 
> other point that have not been considered or  does it conflict with future 
> plans?      Sorry for my poor english.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29881) when Fetch results in sql gateway, the result using open api is different from using restful api

2022-11-03 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17628708#comment-17628708
 ] 

yuanfenghu commented on FLINK-29881:


The executestatment operation is asynchronous, you have to get the operation 
success status to get the result, there should be a rest interface to get the 
operation status

> when Fetch results in sql gateway, the result using open api is different  
> from using restful api  
> ---
>
> Key: FLINK-29881
> URL: https://issues.apache.org/jira/browse/FLINK-29881
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Gateway
>Affects Versions: 1.16.0
>Reporter: yiwei93
>Priority: Major
>
> use restful api , fetch result from  
> {code:java}
>   
> http://hermes02:8083/v1/sessions/9a8fcf37-73e5-43ca-bcc3-d44d8b71a24c/operations/b40085c1-a2c5-42f4-80e7-0971c5ef9710/result/0{code}
> the result is 
> {code:java}
> {
>   "results": {
>     "columns": [
>       {
>         "name": "localtimestamp",
>         "logicalType": {
>           "type": "TIMESTAMP_WITHOUT_TIME_ZONE",
>           "nullable": false,
>           "precision": 3
>         },
>         "comment": null
>       }
>     ],
>     "data": [
>       {
>         "kind": "INSERT",
>         "fields": [
>           "2022-11-04T11:41:40.036"
>         ]
>       }
>     ]
>   },
>   "resultType": "PAYLOAD",
>   "nextResultUri": 
> "/v1/sessions/9a8fcf37-73e5-43ca-bcc3-d44d8b71a24c/operations/b40085c1-a2c5-42f4-80e7-0971c5ef9710/result/1"
> }{code}
> use api to fetch ,the code is 
> {code:java}
> ApiClient client = new ApiClient();
> client.setHost("hermes02");
> client.setPort(8083);
> client.setScheme("http");
> defaultApi = new DefaultApi(client);
> OpenSessionRequestBody openSessionRequestBody = new OpenSessionRequestBody();
> OpenSessionResponseBody openSessionResponseBody = 
> defaultApi.openSession(openSessionRequestBody);
> SessionHandle sessionHandle = new 
> SessionHandle().identifier(UUID.fromString(openSessionResponseBody.getSessionHandle()));
> ExecuteStatementRequestBody executeStatementRequestBody = new 
> ExecuteStatementRequestBody().statement("select localtimestamp");
> ExecuteStatementResponseBody executeStatementResponseBody = 
> defaultApi.executeStatement(sessionHandle.getIdentifier(), 
> executeStatementRequestBody);
> FetchResultsResponseBody fetchResultsResponseBody = 
> defaultApi.fetchResults(sessionHandle.getIdentifier(), 
> UUID.fromString(executeStatementResponseBody.getOperationHandle()), 0L);{code}
> the result is 
> {code:java}
> class FetchResultsResponseBody {
>     results: class ResultSet {
>         resultType: null
>         nextToken: null
>         resultSchema: null
>         data: []
>     }
>     resultType: NOT_READY
>     nextResultUri: 
> /v1/sessions/9a8fcf37-73e5-43ca-bcc3-d44d8b71a24c/operations/b40085c1-a2c5-42f4-80e7-0971c5ef9710/result/0
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-20184) update hive streaming read and temporal table documents

2022-11-04 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17628834#comment-17628834
 ] 

yuanfenghu commented on FLINK-20184:


hello, Leonard Xu

Our platform is recently integrating with hive and I have some questions about 
this documentation.

  1. After integrating hive-catalog, how do I do it? Do I still need to call 
the create ddl statement to create a hive table?

  2. If hivecatalog can be used, how do I use hive as a dimensional table, 
there seems to be no example in the documentation

 3. If it is convenient, can wechat or dingding exchange, my dingding number: 
qp7ad0k
 

> update hive streaming read and temporal table documents
> ---
>
> Key: FLINK-20184
> URL: https://issues.apache.org/jira/browse/FLINK-20184
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive, Documentation
>Reporter: Leonard Xu
>Assignee: Leonard Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> The hive streaming read and temporal table document has been out of style, we 
> need to update it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29646) SQL Gateway should return a simpler error message

2022-10-14 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-29646:
---
Description: 
sql gateway Should return simpler exception information
for example:
  If i execute a sql statement through sql gateway but my statement has syntax 
error  :[ inset into tablea select * from tableb  ]

When I get exception information. The abnormal information returned by the 
server is too redundant to quickly find the Key Information. 
{code:java}
 {code}
 
The key information is:
{code:java}
org.apache.flink.sql.parser.impl.ParseException: Encountered "inset" at line 4, 
column 2. Was expecting one of:     "INSERT" ...     "UPSERT" ... {code}
However, it is difficult for the client to see it quickly. I think sql gateway 
should have higher requirements for exception information.
 
 
 
 

  was:
sql gateway Should return simpler exception information
for example:
  If i execute a sql statement through sql gateway but my statement has syntax 
error  :[ inset into tablea select * from tableb  ]

When I get exception information. The abnormal information returned by the 
server is too redundant to quickly find the Key Information. 
{code:java}
//代码占位符
 {code}
 
The key information is:
{code:java}
//代码占位符
org.apache.flink.sql.parser.impl.ParseException: Encountered "inset" at line 4, 
column 2. Was expecting one of:     "INSERT" ...     "UPSERT" ... {code}
However, it is difficult for the client to see it quickly. I think sql gateway 
should have higher requirements for exception information.
 
 
 
 


> SQL Gateway should return a simpler error message
> -
>
> Key: FLINK-29646
> URL: https://issues.apache.org/jira/browse/FLINK-29646
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Gateway
>Affects Versions: 1.16.0
>Reporter: yuanfenghu
>Priority: Minor
> Fix For: 1.16.0
>
>
> sql gateway Should return simpler exception information
> for example:
>   If i execute a sql statement through sql gateway but my statement has 
> syntax error  :[ inset into tablea select * from tableb  ]
> When I get exception information. The abnormal information returned by the 
> server is too redundant to quickly find the Key Information. 
> {code:java}
>  org.apache.flink.table.gateway.api.utils.SqlGatewayException: 
> org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to 
> fetchResults.
>     at 
> org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:77)
>     at 
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84)
>     at 
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52)
>     at 
> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
>     at java.util.Optional.ifPresent(Optional.java:159)
>     at 
> org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>     at 
> org.apache.flink.shaded.n

[jira] [Created] (FLINK-29646) SQL Gateway should return a simpler error message

2022-10-14 Thread yuanfenghu (Jira)
yuanfenghu created FLINK-29646:
--

 Summary: SQL Gateway should return a simpler error message
 Key: FLINK-29646
 URL: https://issues.apache.org/jira/browse/FLINK-29646
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Gateway
Affects Versions: 1.16.0
Reporter: yuanfenghu
 Fix For: 1.16.0


sql gateway Should return simpler exception information
for example:
  If i execute a sql statement through sql gateway but my statement has syntax 
error  :[ inset into tablea select * from tableb  ]

When I get exception information. The abnormal information returned by the 
server is too redundant to quickly find the Key Information. 
{code:java}
//代码占位符
 {code}
 
The key information is:
{code:java}
//代码占位符
org.apache.flink.sql.parser.impl.ParseException: Encountered "inset" at line 4, 
column 2. Was expecting one of:     "INSERT" ...     "UPSERT" ... {code}
However, it is difficult for the client to see it quickly. I think sql gateway 
should have higher requirements for exception information.
 
 
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29646) SQL Gateway should return a simpler error message

2022-10-14 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-29646:
---
Description: 
sql gateway should return simpler exception information
for example:
  If i execute a sql statement through sql gateway but my statement has syntax 
error  :[ inset into tablea select * from tableb  ]

When I get exception information. The abnormal information returned by the 
server is too redundant to quickly find the Key Information. 
{code:java}
 {code}
 
The key information is:
{code:java}
org.apache.flink.sql.parser.impl.ParseException: Encountered "inset" at line 4, 
column 2. Was expecting one of:     "INSERT" ...     "UPSERT" ... {code}
However, it is difficult for the client to see it quickly. I think sql gateway 
should have higher requirements for exception information.

[~Wencong Liu] 

  was:
sql gateway Should return simpler exception information
for example:
  If i execute a sql statement through sql gateway but my statement has syntax 
error  :[ inset into tablea select * from tableb  ]

When I get exception information. The abnormal information returned by the 
server is too redundant to quickly find the Key Information. 
{code:java}
 {code}
 
The key information is:
{code:java}
org.apache.flink.sql.parser.impl.ParseException: Encountered "inset" at line 4, 
column 2. Was expecting one of:     "INSERT" ...     "UPSERT" ... {code}
However, it is difficult for the client to see it quickly. I think sql gateway 
should have higher requirements for exception information.

[~Wencong Liu] 


> SQL Gateway should return a simpler error message
> -
>
> Key: FLINK-29646
> URL: https://issues.apache.org/jira/browse/FLINK-29646
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Gateway
>Affects Versions: 1.16.0
>Reporter: yuanfenghu
>Priority: Minor
> Fix For: 1.16.0
>
>
> sql gateway should return simpler exception information
> for example:
>   If i execute a sql statement through sql gateway but my statement has 
> syntax error  :[ inset into tablea select * from tableb  ]
> When I get exception information. The abnormal information returned by the 
> server is too redundant to quickly find the Key Information. 
> {code:java}
>  org.apache.flink.table.gateway.api.utils.SqlGatewayException: 
> org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to 
> fetchResults.
>     at 
> org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:77)
>     at 
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84)
>     at 
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52)
>     at 
> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
>     at java.util.Optional.ifPresent(Optional.java:159)
>     at 
> org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>     at 
> org.apache.flink.shaded

[jira] [Updated] (FLINK-29646) SQL Gateway should return a simpler error message

2022-10-14 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-29646:
---
Description: 
sql gateway Should return simpler exception information
for example:
  If i execute a sql statement through sql gateway but my statement has syntax 
error  :[ inset into tablea select * from tableb  ]

When I get exception information. The abnormal information returned by the 
server is too redundant to quickly find the Key Information. 
{code:java}
 {code}
 
The key information is:
{code:java}
org.apache.flink.sql.parser.impl.ParseException: Encountered "inset" at line 4, 
column 2. Was expecting one of:     "INSERT" ...     "UPSERT" ... {code}
However, it is difficult for the client to see it quickly. I think sql gateway 
should have higher requirements for exception information.

[~Wencong Liu] 

  was:
sql gateway Should return simpler exception information
for example:
  If i execute a sql statement through sql gateway but my statement has syntax 
error  :[ inset into tablea select * from tableb  ]

When I get exception information. The abnormal information returned by the 
server is too redundant to quickly find the Key Information. 
{code:java}
 {code}
 
The key information is:
{code:java}
org.apache.flink.sql.parser.impl.ParseException: Encountered "inset" at line 4, 
column 2. Was expecting one of:     "INSERT" ...     "UPSERT" ... {code}
However, it is difficult for the client to see it quickly. I think sql gateway 
should have higher requirements for exception information.
 
 
 
 


> SQL Gateway should return a simpler error message
> -
>
> Key: FLINK-29646
> URL: https://issues.apache.org/jira/browse/FLINK-29646
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Gateway
>Affects Versions: 1.16.0
>Reporter: yuanfenghu
>Priority: Minor
> Fix For: 1.16.0
>
>
> sql gateway Should return simpler exception information
> for example:
>   If i execute a sql statement through sql gateway but my statement has 
> syntax error  :[ inset into tablea select * from tableb  ]
> When I get exception information. The abnormal information returned by the 
> server is too redundant to quickly find the Key Information. 
> {code:java}
>  org.apache.flink.table.gateway.api.utils.SqlGatewayException: 
> org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to 
> fetchResults.
>     at 
> org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:77)
>     at 
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84)
>     at 
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52)
>     at 
> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
>     at java.util.Optional.ifPresent(Optional.java:159)
>     at 
> org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>     at 
> org.apache.flink.shaded.netty4.i

[jira] [Updated] (FLINK-29646) SQL Gateway should return a simpler error message

2022-10-15 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-29646:
---
Priority: Critical  (was: Minor)

> SQL Gateway should return a simpler error message
> -
>
> Key: FLINK-29646
> URL: https://issues.apache.org/jira/browse/FLINK-29646
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Gateway
>Affects Versions: 1.16.0
>Reporter: yuanfenghu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> sql gateway should return simpler exception information
> for example:
>   If i execute a sql statement through sql gateway but my statement has 
> syntax error  :[ inset into tablea select * from tableb  ]
> When I get exception information. The abnormal information returned by the 
> server is too redundant to quickly find the Key Information. 
> {code:java}
>  org.apache.flink.table.gateway.api.utils.SqlGatewayException: 
> org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to 
> fetchResults.
>     at 
> org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:77)
>     at 
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84)
>     at 
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52)
>     at 
> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
>     at java.util.Optional.ifPresent(Optional.java:159)
>     at 
> org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>     at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>     at 
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:210)
>     at 
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>     at 
> org.apache.flink.shade

[jira] [Updated] (FLINK-29646) SQL Gateway should return a simpler error message

2022-10-15 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-29646:
---
Priority: Major  (was: Critical)

> SQL Gateway should return a simpler error message
> -
>
> Key: FLINK-29646
> URL: https://issues.apache.org/jira/browse/FLINK-29646
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Gateway
>Affects Versions: 1.16.0
>Reporter: yuanfenghu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> sql gateway should return simpler exception information
> for example:
>   If i execute a sql statement through sql gateway but my statement has 
> syntax error  :[ inset into tablea select * from tableb  ]
> When I get exception information. The abnormal information returned by the 
> server is too redundant to quickly find the Key Information. 
> {code:java}
>  org.apache.flink.table.gateway.api.utils.SqlGatewayException: 
> org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to 
> fetchResults.
>     at 
> org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:77)
>     at 
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84)
>     at 
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52)
>     at 
> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
>     at java.util.Optional.ifPresent(Optional.java:159)
>     at 
> org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>     at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>     at 
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:210)
>     at 
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>     at 
> org.apache.flink.shaded.n

  1   2   >