Stefan Richter created FLINK-5052:
-------------------------------------

             Summary: Changing the maximum parallelism (number of key groups) 
of a job
                 Key: FLINK-5052
                 URL: https://issues.apache.org/jira/browse/FLINK-5052
             Project: Flink
          Issue Type: Improvement
          Components: State Backends, Checkpointing
            Reporter: Stefan Richter


Through dynamic rescaling, Flink jobs can already adjust their parallelism and 
each operator only has to read it's assigned key-groups. 

However, the maximum parallelism is determined by the number of key-groups  
(aka maxParallelism), which is currently fixed forever after the job is first 
started. We could consider to relax this limitations, so that users can modify 
the number of key-groups after the fact, which is useful in particular for 
upscaling jobs from older Flink versions (<1.2) which must be converted with 
maxparallelism == parallelism.

In the general case, changing the maxParallelism can lead to shuffling of keys 
between key-groups, which means that a change in the number of key-groups can 
shuffle keys between key-groups and we would have to read the complete state on 
each operator instance, filtering for those keys that actually fall into the 
key-groups assigned to the operator instances. While it is certainly possible 
to support this, it is obviously a very expensive operation.

Fortunately, the assignment of keys to operators is currently determined as 
follows:

{{operatorInstance = computeKeyGroup(key) * parallelism / maxParallelism}}

This means that we can provide more efficient support for upscaling of 
maxParallelism, if {{newMaxParallelism == n * oldMaxParallelism}}. In this 
case, keys are not reshuffled between key-groups, but key-groups are split by a 
factor n instead. This only focus on some old key-groups when restoring 
operator instances for new maxParallelism and significantly reduces the amount 
of unnecessary data transfer, e.g. ~ 1/2 for increasing maxParallelism by a 
factor 2, ~2/3 when increasing by a factor 3, etc. 

Implementing this feature would require the following steps:
        - Introduce/modify state handles with the capability to summarize 
multiple logical keygroups into one mixed physical entity.
        - Enhance StateAssignmentOperation so that it can deal with and 
correctly assign the new/modified keyed state handles to subtasks on restoring 
a checkpoint. We also need to implement how to compute the correct 
super-key-group, but this is rather simple.
        - Filtering out key clippings on restoring in the backends.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to