[ https://issues.apache.org/jira/browse/FLINK-23190 ]
yuanfenghu deleted comment on FLINK-23190: ------------------------------------ was (Author: JIRAUSER296932): [~loyi] no, i just use ``` private Collection<ExecutionSlotSharingGroup> sortExecutionSlotSharingGroup( Collection<ExecutionSlotSharingGroup> executionVertexGroups) { Map<Set<JobVertexID>, List<ExecutionSlotSharingGroup>> jobVertexGroups = executionVertexGroups.stream() .collect(Collectors.groupingBy(this::getJobVertexSharingGroup)); List<List<ExecutionSlotSharingGroup>> groups = jobVertexGroups.entrySet() .stream().sorted((g1, g2) -> Integer.compare(g2.getKey().size(), g1.getKey().size())) .map(Map.Entry::getValue).collect(Collectors.toList()); List<ExecutionSlotSharingGroup> sorted = new ArrayList<>(); int i = executionVertexGroups.size(), j = 0; while (i > 0) { List<ExecutionSlotSharingGroup> group = groups.get((j++) % groups.size()); if (group.isEmpty()) { continue; } i--; sorted.add(group.remove(0)); } return sorted; } ``` and the Flink version I use is 1.16.2 , is this different from your experimental environment? > Make task-slot allocation much more evenly > ------------------------------------------ > > Key: FLINK-23190 > URL: https://issues.apache.org/jira/browse/FLINK-23190 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination > Affects Versions: 1.14.0, 1.12.3, 1.13.1 > Reporter: loyi > Assignee: loyi > Priority: Major > Labels: pull-request-available, stale-assigned > Attachments: image-2021-07-16-10-34-30-700.png > > > FLINK-12122 only guarantees spreading out tasks across the set of TMs which > are registered at the time of scheduling, but our jobs are all runing on > active yarn mode, the job with smaller source parallelism offen cause > load-balance issues. > > For this job: > {code:java} > // -ys 4 means 10 taskmanagers > env.addSource(...).name("A").setParallelism(10). > map(...).name("B").setParallelism(30) > .map(...).name("C").setParallelism(40) > .addSink(...).name("D").setParallelism(20); > {code} > > Flink-1.12.3 task allocation: > ||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10|| > |A| > 1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}| > |B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}| > |C|4|4|4|4|4|4|4|4|4|4| > |D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}| > > Suggestions: > When TaskManger start register slots to slotManager , current processing > logic will choose the first pendingSlot which meet its resource > requirements. The "random" strategy usually causes uneven task allocation > when source-operator's parallelism is significantly below process-operator's. > A simple feasible idea is {color:#de350b}partition{color} the current > "{color:#de350b}pendingSlots{color}" by their "JobVertexIds" (such as let > AllocationID bring the detail) , then allocate the slots proportionally to > each JobVertexGroup. > > For above case, the 40 pendingSlots could be divided into 4 groups: > [ABCD]: 10 // A、B、C、D reprents {color:#de350b}jobVertexId{color} > [BCD]: 10 > [CD]: 10 > [D]: 10 > > Every taskmanager will provide 4 slots one time, and each group will get 1 > slot according their proportion (1/4), the final allocation result is below: > [ABCD] : deploye on 10 different taskmangers > [BCD]: deploye on 10 different taskmangers > [CD]: deploye on 10 different taskmangers > [D]: deploye on 10 different taskmangers > > I have implement a [concept > code|https://github.com/saddays/flink/commit/dc82e60a7c7599fbcb58c14f8e3445bc8d07ace1] > based on Flink-1.12.3 , the patch version has {color:#de350b}fully > evenly{color} task allocation , and works well on my workload . Are there > other point that have not been considered or does it conflict with future > plans? Sorry for my poor english. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)